rt/emul/compact/src/main/java/java/util/concurrent/SynchronousQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 12:51:03 +0100
changeset 1895 bfaf3300b7ba
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent package compilable except ForkJoinPool
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
    32  * assistance from members of JCP JSR-166 Expert Group and released to
    33  * the public domain, as explained at
    34  * http://creativecommons.org/publicdomain/zero/1.0/
    35  */
    36 
    37 package java.util.concurrent;
    38 import java.util.concurrent.locks.*;
    39 import java.util.*;
    40 
    41 /**
    42  * A {@linkplain BlockingQueue blocking queue} in which each insert
    43  * operation must wait for a corresponding remove operation by another
    44  * thread, and vice versa.  A synchronous queue does not have any
    45  * internal capacity, not even a capacity of one.  You cannot
    46  * <tt>peek</tt> at a synchronous queue because an element is only
    47  * present when you try to remove it; you cannot insert an element
    48  * (using any method) unless another thread is trying to remove it;
    49  * you cannot iterate as there is nothing to iterate.  The
    50  * <em>head</em> of the queue is the element that the first queued
    51  * inserting thread is trying to add to the queue; if there is no such
    52  * queued thread then no element is available for removal and
    53  * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
    54  * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
    55  * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
    56  * does not permit <tt>null</tt> elements.
    57  *
    58  * <p>Synchronous queues are similar to rendezvous channels used in
    59  * CSP and Ada. They are well suited for handoff designs, in which an
    60  * object running in one thread must sync up with an object running
    61  * in another thread in order to hand it some information, event, or
    62  * task.
    63  *
    64  * <p> This class supports an optional fairness policy for ordering
    65  * waiting producer and consumer threads.  By default, this ordering
    66  * is not guaranteed. However, a queue constructed with fairness set
    67  * to <tt>true</tt> grants threads access in FIFO order.
    68  *
    69  * <p>This class and its iterator implement all of the
    70  * <em>optional</em> methods of the {@link Collection} and {@link
    71  * Iterator} interfaces.
    72  *
    73  * <p>This class is a member of the
    74  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    75  * Java Collections Framework</a>.
    76  *
    77  * @since 1.5
    78  * @author Doug Lea and Bill Scherer and Michael Scott
    79  * @param <E> the type of elements held in this collection
    80  */
    81 public class SynchronousQueue<E> extends AbstractQueue<E>
    82     implements BlockingQueue<E>, java.io.Serializable {
    83     private static final long serialVersionUID = -3223113410248163686L;
    84 
    85     /*
    86      * This class implements extensions of the dual stack and dual
    87      * queue algorithms described in "Nonblocking Concurrent Objects
    88      * with Condition Synchronization", by W. N. Scherer III and
    89      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
    90      * Oct. 2004 (see also
    91      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
    92      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
    93      * queue for fair mode. The performance of the two is generally
    94      * similar. Fifo usually supports higher throughput under
    95      * contention but Lifo maintains higher thread locality in common
    96      * applications.
    97      *
    98      * A dual queue (and similarly stack) is one that at any given
    99      * time either holds "data" -- items provided by put operations,
   100      * or "requests" -- slots representing take operations, or is
   101      * empty. A call to "fulfill" (i.e., a call requesting an item
   102      * from a queue holding data or vice versa) dequeues a
   103      * complementary node.  The most interesting feature of these
   104      * queues is that any operation can figure out which mode the
   105      * queue is in, and act accordingly without needing locks.
   106      *
   107      * Both the queue and stack extend abstract class Transferer
   108      * defining the single method transfer that does a put or a
   109      * take. These are unified into a single method because in dual
   110      * data structures, the put and take operations are symmetrical,
   111      * so nearly all code can be combined. The resulting transfer
   112      * methods are on the long side, but are easier to follow than
   113      * they would be if broken up into nearly-duplicated parts.
   114      *
   115      * The queue and stack data structures share many conceptual
   116      * similarities but very few concrete details. For simplicity,
   117      * they are kept distinct so that they can later evolve
   118      * separately.
   119      *
   120      * The algorithms here differ from the versions in the above paper
   121      * in extending them for use in synchronous queues, as well as
   122      * dealing with cancellation. The main differences include:
   123      *
   124      *  1. The original algorithms used bit-marked pointers, but
   125      *     the ones here use mode bits in nodes, leading to a number
   126      *     of further adaptations.
   127      *  2. SynchronousQueues must block threads waiting to become
   128      *     fulfilled.
   129      *  3. Support for cancellation via timeout and interrupts,
   130      *     including cleaning out cancelled nodes/threads
   131      *     from lists to avoid garbage retention and memory depletion.
   132      *
   133      * Blocking is mainly accomplished using LockSupport park/unpark,
   134      * except that nodes that appear to be the next ones to become
   135      * fulfilled first spin a bit (on multiprocessors only). On very
   136      * busy synchronous queues, spinning can dramatically improve
   137      * throughput. And on less busy ones, the amount of spinning is
   138      * small enough not to be noticeable.
   139      *
   140      * Cleaning is done in different ways in queues vs stacks.  For
   141      * queues, we can almost always remove a node immediately in O(1)
   142      * time (modulo retries for consistency checks) when it is
   143      * cancelled. But if it may be pinned as the current tail, it must
   144      * wait until some subsequent cancellation. For stacks, we need a
   145      * potentially O(n) traversal to be sure that we can remove the
   146      * node, but this can run concurrently with other threads
   147      * accessing the stack.
   148      *
   149      * While garbage collection takes care of most node reclamation
   150      * issues that otherwise complicate nonblocking algorithms, care
   151      * is taken to "forget" references to data, other nodes, and
   152      * threads that might be held on to long-term by blocked
   153      * threads. In cases where setting to null would otherwise
   154      * conflict with main algorithms, this is done by changing a
   155      * node's link to now point to the node itself. This doesn't arise
   156      * much for Stack nodes (because blocked threads do not hang on to
   157      * old head pointers), but references in Queue nodes must be
   158      * aggressively forgotten to avoid reachability of everything any
   159      * node has ever referred to since arrival.
   160      */
   161 
   162     /**
   163      * Shared internal API for dual stacks and queues.
   164      */
   165     abstract static class Transferer {
   166         /**
   167          * Performs a put or take.
   168          *
   169          * @param e if non-null, the item to be handed to a consumer;
   170          *          if null, requests that transfer return an item
   171          *          offered by producer.
   172          * @param timed if this operation should timeout
   173          * @param nanos the timeout, in nanoseconds
   174          * @return if non-null, the item provided or received; if null,
   175          *         the operation failed due to timeout or interrupt --
   176          *         the caller can distinguish which of these occurred
   177          *         by checking Thread.interrupted.
   178          */
   179         abstract Object transfer(Object e, boolean timed, long nanos);
   180     }
   181 
   182     /** The number of CPUs, for spin control */
   183     static final int NCPUS = 1;
   184 
   185     /**
   186      * The number of times to spin before blocking in timed waits.
   187      * The value is empirically derived -- it works well across a
   188      * variety of processors and OSes. Empirically, the best value
   189      * seems not to vary with number of CPUs (beyond 2) so is just
   190      * a constant.
   191      */
   192     static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
   193 
   194     /**
   195      * The number of times to spin before blocking in untimed waits.
   196      * This is greater than timed value because untimed waits spin
   197      * faster since they don't need to check times on each spin.
   198      */
   199     static final int maxUntimedSpins = maxTimedSpins * 16;
   200 
   201     /**
   202      * The number of nanoseconds for which it is faster to spin
   203      * rather than to use timed park. A rough estimate suffices.
   204      */
   205     static final long spinForTimeoutThreshold = 1000L;
   206 
   207     /** Dual stack */
   208     static final class TransferStack extends Transferer {
   209         /*
   210          * This extends Scherer-Scott dual stack algorithm, differing,
   211          * among other ways, by using "covering" nodes rather than
   212          * bit-marked pointers: Fulfilling operations push on marker
   213          * nodes (with FULFILLING bit set in mode) to reserve a spot
   214          * to match a waiting node.
   215          */
   216 
   217         /* Modes for SNodes, ORed together in node fields */
   218         /** Node represents an unfulfilled consumer */
   219         static final int REQUEST    = 0;
   220         /** Node represents an unfulfilled producer */
   221         static final int DATA       = 1;
   222         /** Node is fulfilling another unfulfilled DATA or REQUEST */
   223         static final int FULFILLING = 2;
   224 
   225         /** Return true if m has fulfilling bit set */
   226         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
   227 
   228         /** Node class for TransferStacks. */
   229         static final class SNode {
   230             volatile SNode next;        // next node in stack
   231             volatile SNode match;       // the node matched to this
   232             volatile Thread waiter;     // to control park/unpark
   233             Object item;                // data; or null for REQUESTs
   234             int mode;
   235             // Note: item and mode fields don't need to be volatile
   236             // since they are always written before, and read after,
   237             // other volatile/atomic operations.
   238 
   239             SNode(Object item) {
   240                 this.item = item;
   241             }
   242 
   243             boolean casNext(SNode cmp, SNode val) {
   244                 if (next == cmp) {
   245                     next = val;
   246                     return true;
   247                 }
   248                 return false;
   249             }
   250 
   251             /**
   252              * Tries to match node s to this node, if so, waking up thread.
   253              * Fulfillers call tryMatch to identify their waiters.
   254              * Waiters block until they have been matched.
   255              *
   256              * @param s the node to match
   257              * @return true if successfully matched to s
   258              */
   259             boolean tryMatch(SNode s) {
   260                 if (match == null) {
   261                     match = s;
   262                     Thread w = waiter;
   263                     if (w != null) {    // waiters need at most one unpark
   264                         waiter = null;
   265                         LockSupport.unpark(w);
   266                     }
   267                     return true;
   268                 }
   269                 return match == s;
   270             }
   271 
   272             /**
   273              * Tries to cancel a wait by matching node to itself.
   274              */
   275             void tryCancel() {
   276                 if (match == null) {
   277                     match = this;
   278                 }
   279             }
   280 
   281             boolean isCancelled() {
   282                 return match == this;
   283             }
   284         }
   285 
   286         /** The head (top) of the stack */
   287         volatile SNode head;
   288 
   289         boolean casHead(SNode h, SNode nh) {
   290             if (head == h) {
   291                 head = nh;
   292                 return true;
   293             }
   294             return false;
   295         }
   296 
   297         /**
   298          * Creates or resets fields of a node. Called only from transfer
   299          * where the node to push on stack is lazily created and
   300          * reused when possible to help reduce intervals between reads
   301          * and CASes of head and to avoid surges of garbage when CASes
   302          * to push nodes fail due to contention.
   303          */
   304         static SNode snode(SNode s, Object e, SNode next, int mode) {
   305             if (s == null) s = new SNode(e);
   306             s.mode = mode;
   307             s.next = next;
   308             return s;
   309         }
   310 
   311         /**
   312          * Puts or takes an item.
   313          */
   314         Object transfer(Object e, boolean timed, long nanos) {
   315             /*
   316              * Basic algorithm is to loop trying one of three actions:
   317              *
   318              * 1. If apparently empty or already containing nodes of same
   319              *    mode, try to push node on stack and wait for a match,
   320              *    returning it, or null if cancelled.
   321              *
   322              * 2. If apparently containing node of complementary mode,
   323              *    try to push a fulfilling node on to stack, match
   324              *    with corresponding waiting node, pop both from
   325              *    stack, and return matched item. The matching or
   326              *    unlinking might not actually be necessary because of
   327              *    other threads performing action 3:
   328              *
   329              * 3. If top of stack already holds another fulfilling node,
   330              *    help it out by doing its match and/or pop
   331              *    operations, and then continue. The code for helping
   332              *    is essentially the same as for fulfilling, except
   333              *    that it doesn't return the item.
   334              */
   335 
   336             SNode s = null; // constructed/reused as needed
   337             int mode = (e == null) ? REQUEST : DATA;
   338 
   339             for (;;) {
   340                 SNode h = head;
   341                 if (h == null || h.mode == mode) {  // empty or same-mode
   342                     if (timed && nanos <= 0) {      // can't wait
   343                         if (h != null && h.isCancelled())
   344                             casHead(h, h.next);     // pop cancelled node
   345                         else
   346                             return null;
   347                     } else if (casHead(h, s = snode(s, e, h, mode))) {
   348                         SNode m = awaitFulfill(s, timed, nanos);
   349                         if (m == s) {               // wait was cancelled
   350                             clean(s);
   351                             return null;
   352                         }
   353                         if ((h = head) != null && h.next == s)
   354                             casHead(h, s.next);     // help s's fulfiller
   355                         return (mode == REQUEST) ? m.item : s.item;
   356                     }
   357                 } else if (!isFulfilling(h.mode)) { // try to fulfill
   358                     if (h.isCancelled())            // already cancelled
   359                         casHead(h, h.next);         // pop and retry
   360                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
   361                         for (;;) { // loop until matched or waiters disappear
   362                             SNode m = s.next;       // m is s's match
   363                             if (m == null) {        // all waiters are gone
   364                                 casHead(s, null);   // pop fulfill node
   365                                 s = null;           // use new node next time
   366                                 break;              // restart main loop
   367                             }
   368                             SNode mn = m.next;
   369                             if (m.tryMatch(s)) {
   370                                 casHead(s, mn);     // pop both s and m
   371                                 return (mode == REQUEST) ? m.item : s.item;
   372                             } else                  // lost match
   373                                 s.casNext(m, mn);   // help unlink
   374                         }
   375                     }
   376                 } else {                            // help a fulfiller
   377                     SNode m = h.next;               // m is h's match
   378                     if (m == null)                  // waiter is gone
   379                         casHead(h, null);           // pop fulfilling node
   380                     else {
   381                         SNode mn = m.next;
   382                         if (m.tryMatch(h))          // help match
   383                             casHead(h, mn);         // pop both h and m
   384                         else                        // lost match
   385                             h.casNext(m, mn);       // help unlink
   386                     }
   387                 }
   388             }
   389         }
   390 
   391         /**
   392          * Spins/blocks until node s is matched by a fulfill operation.
   393          *
   394          * @param s the waiting node
   395          * @param timed true if timed wait
   396          * @param nanos timeout value
   397          * @return matched node, or s if cancelled
   398          */
   399         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
   400             /*
   401              * When a node/thread is about to block, it sets its waiter
   402              * field and then rechecks state at least one more time
   403              * before actually parking, thus covering race vs
   404              * fulfiller noticing that waiter is non-null so should be
   405              * woken.
   406              *
   407              * When invoked by nodes that appear at the point of call
   408              * to be at the head of the stack, calls to park are
   409              * preceded by spins to avoid blocking when producers and
   410              * consumers are arriving very close in time.  This can
   411              * happen enough to bother only on multiprocessors.
   412              *
   413              * The order of checks for returning out of main loop
   414              * reflects fact that interrupts have precedence over
   415              * normal returns, which have precedence over
   416              * timeouts. (So, on timeout, one last check for match is
   417              * done before giving up.) Except that calls from untimed
   418              * SynchronousQueue.{poll/offer} don't check interrupts
   419              * and don't wait at all, so are trapped in transfer
   420              * method rather than calling awaitFulfill.
   421              */
   422             long lastTime = timed ? System.nanoTime() : 0;
   423             Thread w = Thread.currentThread();
   424             SNode h = head;
   425             int spins = (shouldSpin(s) ?
   426                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
   427             for (;;) {
   428                 if (w.isInterrupted())
   429                     s.tryCancel();
   430                 SNode m = s.match;
   431                 if (m != null)
   432                     return m;
   433                 if (timed) {
   434                     long now = System.nanoTime();
   435                     nanos -= now - lastTime;
   436                     lastTime = now;
   437                     if (nanos <= 0) {
   438                         s.tryCancel();
   439                         continue;
   440                     }
   441                 }
   442                 if (spins > 0)
   443                     spins = shouldSpin(s) ? (spins-1) : 0;
   444                 else if (s.waiter == null)
   445                     s.waiter = w; // establish waiter so can park next iter
   446                 else if (!timed)
   447                     LockSupport.park(this);
   448                 else if (nanos > spinForTimeoutThreshold)
   449                     LockSupport.parkNanos(this, nanos);
   450             }
   451         }
   452 
   453         /**
   454          * Returns true if node s is at head or there is an active
   455          * fulfiller.
   456          */
   457         boolean shouldSpin(SNode s) {
   458             SNode h = head;
   459             return (h == s || h == null || isFulfilling(h.mode));
   460         }
   461 
   462         /**
   463          * Unlinks s from the stack.
   464          */
   465         void clean(SNode s) {
   466             s.item = null;   // forget item
   467             s.waiter = null; // forget thread
   468 
   469             /*
   470              * At worst we may need to traverse entire stack to unlink
   471              * s. If there are multiple concurrent calls to clean, we
   472              * might not see s if another thread has already removed
   473              * it. But we can stop when we see any node known to
   474              * follow s. We use s.next unless it too is cancelled, in
   475              * which case we try the node one past. We don't check any
   476              * further because we don't want to doubly traverse just to
   477              * find sentinel.
   478              */
   479 
   480             SNode past = s.next;
   481             if (past != null && past.isCancelled())
   482                 past = past.next;
   483 
   484             // Absorb cancelled nodes at head
   485             SNode p;
   486             while ((p = head) != null && p != past && p.isCancelled())
   487                 casHead(p, p.next);
   488 
   489             // Unsplice embedded nodes
   490             while (p != null && p != past) {
   491                 SNode n = p.next;
   492                 if (n != null && n.isCancelled())
   493                     p.casNext(n, n.next);
   494                 else
   495                     p = n;
   496             }
   497         }
   498     }
   499 
   500     /** Dual Queue */
   501     static final class TransferQueue extends Transferer {
   502         /*
   503          * This extends Scherer-Scott dual queue algorithm, differing,
   504          * among other ways, by using modes within nodes rather than
   505          * marked pointers. The algorithm is a little simpler than
   506          * that for stacks because fulfillers do not need explicit
   507          * nodes, and matching is done by CAS'ing QNode.item field
   508          * from non-null to null (for put) or vice versa (for take).
   509          */
   510 
   511         /** Node class for TransferQueue. */
   512         static final class QNode {
   513             volatile QNode next;          // next node in queue
   514             volatile Object item;         // CAS'ed to or from null
   515             volatile Thread waiter;       // to control park/unpark
   516             final boolean isData;
   517 
   518             QNode(Object item, boolean isData) {
   519                 this.item = item;
   520                 this.isData = isData;
   521             }
   522 
   523             boolean casNext(QNode cmp, QNode val) {
   524                 if (next == cmp) {
   525                     next = val;
   526                     return true;
   527                 }
   528                 return false;
   529             }
   530 
   531             boolean casItem(Object cmp, Object val) {
   532                 if (item == cmp) {
   533                     item = val;
   534                     return true;
   535                 }
   536                 return false;
   537             }
   538 
   539             /**
   540              * Tries to cancel by CAS'ing ref to this as item.
   541              */
   542             void tryCancel(Object cmp) {
   543                 if (item == cmp) {
   544                     item = this;
   545                 }
   546             }
   547 
   548             boolean isCancelled() {
   549                 return item == this;
   550             }
   551 
   552             /**
   553              * Returns true if this node is known to be off the queue
   554              * because its next pointer has been forgotten due to
   555              * an advanceHead operation.
   556              */
   557             boolean isOffList() {
   558                 return next == this;
   559             }
   560         }
   561 
   562         /** Head of queue */
   563         transient volatile QNode head;
   564         /** Tail of queue */
   565         transient volatile QNode tail;
   566         /**
   567          * Reference to a cancelled node that might not yet have been
   568          * unlinked from queue because it was the last inserted node
   569          * when it cancelled.
   570          */
   571         transient volatile QNode cleanMe;
   572 
   573         TransferQueue() {
   574             QNode h = new QNode(null, false); // initialize to dummy node.
   575             head = h;
   576             tail = h;
   577         }
   578 
   579         /**
   580          * Tries to cas nh as new head; if successful, unlink
   581          * old head's next node to avoid garbage retention.
   582          */
   583         void advanceHead(QNode h, QNode nh) {
   584             if (head == h) {
   585                 head = nh;
   586                 h.next = h; // forget old next
   587             }
   588         }
   589 
   590         /**
   591          * Tries to cas nt as new tail.
   592          */
   593         void advanceTail(QNode t, QNode nt) {
   594             if (tail == t) {
   595                 tail = nt;
   596             }
   597         }
   598 
   599         /**
   600          * Tries to CAS cleanMe slot.
   601          */
   602         boolean casCleanMe(QNode cmp, QNode val) {
   603             if (cleanMe == cmp) {
   604                 cleanMe = val;
   605                 return true;
   606             }
   607             return false;
   608         }
   609 
   610         /**
   611          * Puts or takes an item.
   612          */
   613         Object transfer(Object e, boolean timed, long nanos) {
   614             /* Basic algorithm is to loop trying to take either of
   615              * two actions:
   616              *
   617              * 1. If queue apparently empty or holding same-mode nodes,
   618              *    try to add node to queue of waiters, wait to be
   619              *    fulfilled (or cancelled) and return matching item.
   620              *
   621              * 2. If queue apparently contains waiting items, and this
   622              *    call is of complementary mode, try to fulfill by CAS'ing
   623              *    item field of waiting node and dequeuing it, and then
   624              *    returning matching item.
   625              *
   626              * In each case, along the way, check for and try to help
   627              * advance head and tail on behalf of other stalled/slow
   628              * threads.
   629              *
   630              * The loop starts off with a null check guarding against
   631              * seeing uninitialized head or tail values. This never
   632              * happens in current SynchronousQueue, but could if
   633              * callers held non-volatile/final ref to the
   634              * transferer. The check is here anyway because it places
   635              * null checks at top of loop, which is usually faster
   636              * than having them implicitly interspersed.
   637              */
   638 
   639             QNode s = null; // constructed/reused as needed
   640             boolean isData = (e != null);
   641 
   642             for (;;) {
   643                 QNode t = tail;
   644                 QNode h = head;
   645                 if (t == null || h == null)         // saw uninitialized value
   646                     continue;                       // spin
   647 
   648                 if (h == t || t.isData == isData) { // empty or same-mode
   649                     QNode tn = t.next;
   650                     if (t != tail)                  // inconsistent read
   651                         continue;
   652                     if (tn != null) {               // lagging tail
   653                         advanceTail(t, tn);
   654                         continue;
   655                     }
   656                     if (timed && nanos <= 0)        // can't wait
   657                         return null;
   658                     if (s == null)
   659                         s = new QNode(e, isData);
   660                     if (!t.casNext(null, s))        // failed to link in
   661                         continue;
   662 
   663                     advanceTail(t, s);              // swing tail and wait
   664                     Object x = awaitFulfill(s, e, timed, nanos);
   665                     if (x == s) {                   // wait was cancelled
   666                         clean(t, s);
   667                         return null;
   668                     }
   669 
   670                     if (!s.isOffList()) {           // not already unlinked
   671                         advanceHead(t, s);          // unlink if head
   672                         if (x != null)              // and forget fields
   673                             s.item = s;
   674                         s.waiter = null;
   675                     }
   676                     return (x != null) ? x : e;
   677 
   678                 } else {                            // complementary-mode
   679                     QNode m = h.next;               // node to fulfill
   680                     if (t != tail || m == null || h != head)
   681                         continue;                   // inconsistent read
   682 
   683                     Object x = m.item;
   684                     if (isData == (x != null) ||    // m already fulfilled
   685                         x == m ||                   // m cancelled
   686                         !m.casItem(x, e)) {         // lost CAS
   687                         advanceHead(h, m);          // dequeue and retry
   688                         continue;
   689                     }
   690 
   691                     advanceHead(h, m);              // successfully fulfilled
   692                     LockSupport.unpark(m.waiter);
   693                     return (x != null) ? x : e;
   694                 }
   695             }
   696         }
   697 
   698         /**
   699          * Spins/blocks until node s is fulfilled.
   700          *
   701          * @param s the waiting node
   702          * @param e the comparison value for checking match
   703          * @param timed true if timed wait
   704          * @param nanos timeout value
   705          * @return matched item, or s if cancelled
   706          */
   707         Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
   708             /* Same idea as TransferStack.awaitFulfill */
   709             long lastTime = timed ? System.nanoTime() : 0;
   710             Thread w = Thread.currentThread();
   711             int spins = ((head.next == s) ?
   712                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
   713             for (;;) {
   714                 if (w.isInterrupted())
   715                     s.tryCancel(e);
   716                 Object x = s.item;
   717                 if (x != e)
   718                     return x;
   719                 if (timed) {
   720                     long now = System.nanoTime();
   721                     nanos -= now - lastTime;
   722                     lastTime = now;
   723                     if (nanos <= 0) {
   724                         s.tryCancel(e);
   725                         continue;
   726                     }
   727                 }
   728                 if (spins > 0)
   729                     --spins;
   730                 else if (s.waiter == null)
   731                     s.waiter = w;
   732                 else if (!timed)
   733                     LockSupport.park(this);
   734                 else if (nanos > spinForTimeoutThreshold)
   735                     LockSupport.parkNanos(this, nanos);
   736             }
   737         }
   738 
   739         /**
   740          * Gets rid of cancelled node s with original predecessor pred.
   741          */
   742         void clean(QNode pred, QNode s) {
   743             s.waiter = null; // forget thread
   744             /*
   745              * At any given time, exactly one node on list cannot be
   746              * deleted -- the last inserted node. To accommodate this,
   747              * if we cannot delete s, we save its predecessor as
   748              * "cleanMe", deleting the previously saved version
   749              * first. At least one of node s or the node previously
   750              * saved can always be deleted, so this always terminates.
   751              */
   752             while (pred.next == s) { // Return early if already unlinked
   753                 QNode h = head;
   754                 QNode hn = h.next;   // Absorb cancelled first node as head
   755                 if (hn != null && hn.isCancelled()) {
   756                     advanceHead(h, hn);
   757                     continue;
   758                 }
   759                 QNode t = tail;      // Ensure consistent read for tail
   760                 if (t == h)
   761                     return;
   762                 QNode tn = t.next;
   763                 if (t != tail)
   764                     continue;
   765                 if (tn != null) {
   766                     advanceTail(t, tn);
   767                     continue;
   768                 }
   769                 if (s != t) {        // If not tail, try to unsplice
   770                     QNode sn = s.next;
   771                     if (sn == s || pred.casNext(s, sn))
   772                         return;
   773                 }
   774                 QNode dp = cleanMe;
   775                 if (dp != null) {    // Try unlinking previous cancelled node
   776                     QNode d = dp.next;
   777                     QNode dn;
   778                     if (d == null ||               // d is gone or
   779                         d == dp ||                 // d is off list or
   780                         !d.isCancelled() ||        // d not cancelled or
   781                         (d != t &&                 // d not tail and
   782                          (dn = d.next) != null &&  //   has successor
   783                          dn != d &&                //   that is on list
   784                          dp.casNext(d, dn)))       // d unspliced
   785                         casCleanMe(dp, null);
   786                     if (dp == pred)
   787                         return;      // s is already saved node
   788                 } else if (casCleanMe(null, pred))
   789                     return;          // Postpone cleaning s
   790             }
   791         }
   792     }
   793 
   794     /**
   795      * The transferer. Set only in constructor, but cannot be declared
   796      * as final without further complicating serialization.  Since
   797      * this is accessed only at most once per public method, there
   798      * isn't a noticeable performance penalty for using volatile
   799      * instead of final here.
   800      */
   801     private transient volatile Transferer transferer;
   802 
   803     /**
   804      * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
   805      */
   806     public SynchronousQueue() {
   807         this(false);
   808     }
   809 
   810     /**
   811      * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
   812      *
   813      * @param fair if true, waiting threads contend in FIFO order for
   814      *        access; otherwise the order is unspecified.
   815      */
   816     public SynchronousQueue(boolean fair) {
   817         transferer = fair ? new TransferQueue() : new TransferStack();
   818     }
   819 
   820     /**
   821      * Adds the specified element to this queue, waiting if necessary for
   822      * another thread to receive it.
   823      *
   824      * @throws InterruptedException {@inheritDoc}
   825      * @throws NullPointerException {@inheritDoc}
   826      */
   827     public void put(E o) throws InterruptedException {
   828         if (o == null) throw new NullPointerException();
   829         if (transferer.transfer(o, false, 0) == null) {
   830             Thread.interrupted();
   831             throw new InterruptedException();
   832         }
   833     }
   834 
   835     /**
   836      * Inserts the specified element into this queue, waiting if necessary
   837      * up to the specified wait time for another thread to receive it.
   838      *
   839      * @return <tt>true</tt> if successful, or <tt>false</tt> if the
   840      *         specified waiting time elapses before a consumer appears.
   841      * @throws InterruptedException {@inheritDoc}
   842      * @throws NullPointerException {@inheritDoc}
   843      */
   844     public boolean offer(E o, long timeout, TimeUnit unit)
   845         throws InterruptedException {
   846         if (o == null) throw new NullPointerException();
   847         if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
   848             return true;
   849         if (!Thread.interrupted())
   850             return false;
   851         throw new InterruptedException();
   852     }
   853 
   854     /**
   855      * Inserts the specified element into this queue, if another thread is
   856      * waiting to receive it.
   857      *
   858      * @param e the element to add
   859      * @return <tt>true</tt> if the element was added to this queue, else
   860      *         <tt>false</tt>
   861      * @throws NullPointerException if the specified element is null
   862      */
   863     public boolean offer(E e) {
   864         if (e == null) throw new NullPointerException();
   865         return transferer.transfer(e, true, 0) != null;
   866     }
   867 
   868     /**
   869      * Retrieves and removes the head of this queue, waiting if necessary
   870      * for another thread to insert it.
   871      *
   872      * @return the head of this queue
   873      * @throws InterruptedException {@inheritDoc}
   874      */
   875     public E take() throws InterruptedException {
   876         Object e = transferer.transfer(null, false, 0);
   877         if (e != null)
   878             return (E)e;
   879         Thread.interrupted();
   880         throw new InterruptedException();
   881     }
   882 
   883     /**
   884      * Retrieves and removes the head of this queue, waiting
   885      * if necessary up to the specified wait time, for another thread
   886      * to insert it.
   887      *
   888      * @return the head of this queue, or <tt>null</tt> if the
   889      *         specified waiting time elapses before an element is present.
   890      * @throws InterruptedException {@inheritDoc}
   891      */
   892     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   893         Object e = transferer.transfer(null, true, unit.toNanos(timeout));
   894         if (e != null || !Thread.interrupted())
   895             return (E)e;
   896         throw new InterruptedException();
   897     }
   898 
   899     /**
   900      * Retrieves and removes the head of this queue, if another thread
   901      * is currently making an element available.
   902      *
   903      * @return the head of this queue, or <tt>null</tt> if no
   904      *         element is available.
   905      */
   906     public E poll() {
   907         return (E)transferer.transfer(null, true, 0);
   908     }
   909 
   910     /**
   911      * Always returns <tt>true</tt>.
   912      * A <tt>SynchronousQueue</tt> has no internal capacity.
   913      *
   914      * @return <tt>true</tt>
   915      */
   916     public boolean isEmpty() {
   917         return true;
   918     }
   919 
   920     /**
   921      * Always returns zero.
   922      * A <tt>SynchronousQueue</tt> has no internal capacity.
   923      *
   924      * @return zero.
   925      */
   926     public int size() {
   927         return 0;
   928     }
   929 
   930     /**
   931      * Always returns zero.
   932      * A <tt>SynchronousQueue</tt> has no internal capacity.
   933      *
   934      * @return zero.
   935      */
   936     public int remainingCapacity() {
   937         return 0;
   938     }
   939 
   940     /**
   941      * Does nothing.
   942      * A <tt>SynchronousQueue</tt> has no internal capacity.
   943      */
   944     public void clear() {
   945     }
   946 
   947     /**
   948      * Always returns <tt>false</tt>.
   949      * A <tt>SynchronousQueue</tt> has no internal capacity.
   950      *
   951      * @param o the element
   952      * @return <tt>false</tt>
   953      */
   954     public boolean contains(Object o) {
   955         return false;
   956     }
   957 
   958     /**
   959      * Always returns <tt>false</tt>.
   960      * A <tt>SynchronousQueue</tt> has no internal capacity.
   961      *
   962      * @param o the element to remove
   963      * @return <tt>false</tt>
   964      */
   965     public boolean remove(Object o) {
   966         return false;
   967     }
   968 
   969     /**
   970      * Returns <tt>false</tt> unless the given collection is empty.
   971      * A <tt>SynchronousQueue</tt> has no internal capacity.
   972      *
   973      * @param c the collection
   974      * @return <tt>false</tt> unless given collection is empty
   975      */
   976     public boolean containsAll(Collection<?> c) {
   977         return c.isEmpty();
   978     }
   979 
   980     /**
   981      * Always returns <tt>false</tt>.
   982      * A <tt>SynchronousQueue</tt> has no internal capacity.
   983      *
   984      * @param c the collection
   985      * @return <tt>false</tt>
   986      */
   987     public boolean removeAll(Collection<?> c) {
   988         return false;
   989     }
   990 
   991     /**
   992      * Always returns <tt>false</tt>.
   993      * A <tt>SynchronousQueue</tt> has no internal capacity.
   994      *
   995      * @param c the collection
   996      * @return <tt>false</tt>
   997      */
   998     public boolean retainAll(Collection<?> c) {
   999         return false;
  1000     }
  1001 
  1002     /**
  1003      * Always returns <tt>null</tt>.
  1004      * A <tt>SynchronousQueue</tt> does not return elements
  1005      * unless actively waited on.
  1006      *
  1007      * @return <tt>null</tt>
  1008      */
  1009     public E peek() {
  1010         return null;
  1011     }
  1012 
  1013     /**
  1014      * Returns an empty iterator in which <tt>hasNext</tt> always returns
  1015      * <tt>false</tt>.
  1016      *
  1017      * @return an empty iterator
  1018      */
  1019     public Iterator<E> iterator() {
  1020         return Collections.emptyIterator();
  1021     }
  1022 
  1023     /**
  1024      * Returns a zero-length array.
  1025      * @return a zero-length array
  1026      */
  1027     public Object[] toArray() {
  1028         return new Object[0];
  1029     }
  1030 
  1031     /**
  1032      * Sets the zeroeth element of the specified array to <tt>null</tt>
  1033      * (if the array has non-zero length) and returns it.
  1034      *
  1035      * @param a the array
  1036      * @return the specified array
  1037      * @throws NullPointerException if the specified array is null
  1038      */
  1039     public <T> T[] toArray(T[] a) {
  1040         if (a.length > 0)
  1041             a[0] = null;
  1042         return a;
  1043     }
  1044 
  1045     /**
  1046      * @throws UnsupportedOperationException {@inheritDoc}
  1047      * @throws ClassCastException            {@inheritDoc}
  1048      * @throws NullPointerException          {@inheritDoc}
  1049      * @throws IllegalArgumentException      {@inheritDoc}
  1050      */
  1051     public int drainTo(Collection<? super E> c) {
  1052         if (c == null)
  1053             throw new NullPointerException();
  1054         if (c == this)
  1055             throw new IllegalArgumentException();
  1056         int n = 0;
  1057         E e;
  1058         while ( (e = poll()) != null) {
  1059             c.add(e);
  1060             ++n;
  1061         }
  1062         return n;
  1063     }
  1064 
  1065     /**
  1066      * @throws UnsupportedOperationException {@inheritDoc}
  1067      * @throws ClassCastException            {@inheritDoc}
  1068      * @throws NullPointerException          {@inheritDoc}
  1069      * @throws IllegalArgumentException      {@inheritDoc}
  1070      */
  1071     public int drainTo(Collection<? super E> c, int maxElements) {
  1072         if (c == null)
  1073             throw new NullPointerException();
  1074         if (c == this)
  1075             throw new IllegalArgumentException();
  1076         int n = 0;
  1077         E e;
  1078         while (n < maxElements && (e = poll()) != null) {
  1079             c.add(e);
  1080             ++n;
  1081         }
  1082         return n;
  1083     }
  1084 
  1085     /*
  1086      * To cope with serialization strategy in the 1.5 version of
  1087      * SynchronousQueue, we declare some unused classes and fields
  1088      * that exist solely to enable serializability across versions.
  1089      * These fields are never used, so are initialized only if this
  1090      * object is ever serialized or deserialized.
  1091      */
  1092 
  1093     static class WaitQueue implements java.io.Serializable { }
  1094     static class LifoWaitQueue extends WaitQueue {
  1095         private static final long serialVersionUID = -3633113410248163686L;
  1096     }
  1097     static class FifoWaitQueue extends WaitQueue {
  1098         private static final long serialVersionUID = -3623113410248163686L;
  1099     }
  1100     private ReentrantLock qlock;
  1101     private WaitQueue waitingProducers;
  1102     private WaitQueue waitingConsumers;
  1103 
  1104     /**
  1105      * Save the state to a stream (that is, serialize it).
  1106      *
  1107      * @param s the stream
  1108      */
  1109     private void writeObject(java.io.ObjectOutputStream s)
  1110         throws java.io.IOException {
  1111         boolean fair = transferer instanceof TransferQueue;
  1112         if (fair) {
  1113             qlock = new ReentrantLock(true);
  1114             waitingProducers = new FifoWaitQueue();
  1115             waitingConsumers = new FifoWaitQueue();
  1116         }
  1117         else {
  1118             qlock = new ReentrantLock();
  1119             waitingProducers = new LifoWaitQueue();
  1120             waitingConsumers = new LifoWaitQueue();
  1121         }
  1122         s.defaultWriteObject();
  1123     }
  1124 
  1125     private void readObject(final java.io.ObjectInputStream s)
  1126         throws java.io.IOException, ClassNotFoundException {
  1127         s.defaultReadObject();
  1128         if (waitingProducers instanceof FifoWaitQueue)
  1129             transferer = new TransferQueue();
  1130         else
  1131             transferer = new TransferStack();
  1132     }
  1133 
  1134 
  1135 }