rt/emul/compact/src/main/java/java/util/concurrent/SynchronousQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
child 1895 bfaf3300b7ba
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
    32  * assistance from members of JCP JSR-166 Expert Group and released to
    33  * the public domain, as explained at
    34  * http://creativecommons.org/publicdomain/zero/1.0/
    35  */
    36 
    37 package java.util.concurrent;
    38 import java.util.concurrent.locks.*;
    39 import java.util.concurrent.atomic.*;
    40 import java.util.*;
    41 
    42 /**
    43  * A {@linkplain BlockingQueue blocking queue} in which each insert
    44  * operation must wait for a corresponding remove operation by another
    45  * thread, and vice versa.  A synchronous queue does not have any
    46  * internal capacity, not even a capacity of one.  You cannot
    47  * <tt>peek</tt> at a synchronous queue because an element is only
    48  * present when you try to remove it; you cannot insert an element
    49  * (using any method) unless another thread is trying to remove it;
    50  * you cannot iterate as there is nothing to iterate.  The
    51  * <em>head</em> of the queue is the element that the first queued
    52  * inserting thread is trying to add to the queue; if there is no such
    53  * queued thread then no element is available for removal and
    54  * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
    55  * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
    56  * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
    57  * does not permit <tt>null</tt> elements.
    58  *
    59  * <p>Synchronous queues are similar to rendezvous channels used in
    60  * CSP and Ada. They are well suited for handoff designs, in which an
    61  * object running in one thread must sync up with an object running
    62  * in another thread in order to hand it some information, event, or
    63  * task.
    64  *
    65  * <p> This class supports an optional fairness policy for ordering
    66  * waiting producer and consumer threads.  By default, this ordering
    67  * is not guaranteed. However, a queue constructed with fairness set
    68  * to <tt>true</tt> grants threads access in FIFO order.
    69  *
    70  * <p>This class and its iterator implement all of the
    71  * <em>optional</em> methods of the {@link Collection} and {@link
    72  * Iterator} interfaces.
    73  *
    74  * <p>This class is a member of the
    75  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    76  * Java Collections Framework</a>.
    77  *
    78  * @since 1.5
    79  * @author Doug Lea and Bill Scherer and Michael Scott
    80  * @param <E> the type of elements held in this collection
    81  */
    82 public class SynchronousQueue<E> extends AbstractQueue<E>
    83     implements BlockingQueue<E>, java.io.Serializable {
    84     private static final long serialVersionUID = -3223113410248163686L;
    85 
    86     /*
    87      * This class implements extensions of the dual stack and dual
    88      * queue algorithms described in "Nonblocking Concurrent Objects
    89      * with Condition Synchronization", by W. N. Scherer III and
    90      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
    91      * Oct. 2004 (see also
    92      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
    93      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
    94      * queue for fair mode. The performance of the two is generally
    95      * similar. Fifo usually supports higher throughput under
    96      * contention but Lifo maintains higher thread locality in common
    97      * applications.
    98      *
    99      * A dual queue (and similarly stack) is one that at any given
   100      * time either holds "data" -- items provided by put operations,
   101      * or "requests" -- slots representing take operations, or is
   102      * empty. A call to "fulfill" (i.e., a call requesting an item
   103      * from a queue holding data or vice versa) dequeues a
   104      * complementary node.  The most interesting feature of these
   105      * queues is that any operation can figure out which mode the
   106      * queue is in, and act accordingly without needing locks.
   107      *
   108      * Both the queue and stack extend abstract class Transferer
   109      * defining the single method transfer that does a put or a
   110      * take. These are unified into a single method because in dual
   111      * data structures, the put and take operations are symmetrical,
   112      * so nearly all code can be combined. The resulting transfer
   113      * methods are on the long side, but are easier to follow than
   114      * they would be if broken up into nearly-duplicated parts.
   115      *
   116      * The queue and stack data structures share many conceptual
   117      * similarities but very few concrete details. For simplicity,
   118      * they are kept distinct so that they can later evolve
   119      * separately.
   120      *
   121      * The algorithms here differ from the versions in the above paper
   122      * in extending them for use in synchronous queues, as well as
   123      * dealing with cancellation. The main differences include:
   124      *
   125      *  1. The original algorithms used bit-marked pointers, but
   126      *     the ones here use mode bits in nodes, leading to a number
   127      *     of further adaptations.
   128      *  2. SynchronousQueues must block threads waiting to become
   129      *     fulfilled.
   130      *  3. Support for cancellation via timeout and interrupts,
   131      *     including cleaning out cancelled nodes/threads
   132      *     from lists to avoid garbage retention and memory depletion.
   133      *
   134      * Blocking is mainly accomplished using LockSupport park/unpark,
   135      * except that nodes that appear to be the next ones to become
   136      * fulfilled first spin a bit (on multiprocessors only). On very
   137      * busy synchronous queues, spinning can dramatically improve
   138      * throughput. And on less busy ones, the amount of spinning is
   139      * small enough not to be noticeable.
   140      *
   141      * Cleaning is done in different ways in queues vs stacks.  For
   142      * queues, we can almost always remove a node immediately in O(1)
   143      * time (modulo retries for consistency checks) when it is
   144      * cancelled. But if it may be pinned as the current tail, it must
   145      * wait until some subsequent cancellation. For stacks, we need a
   146      * potentially O(n) traversal to be sure that we can remove the
   147      * node, but this can run concurrently with other threads
   148      * accessing the stack.
   149      *
   150      * While garbage collection takes care of most node reclamation
   151      * issues that otherwise complicate nonblocking algorithms, care
   152      * is taken to "forget" references to data, other nodes, and
   153      * threads that might be held on to long-term by blocked
   154      * threads. In cases where setting to null would otherwise
   155      * conflict with main algorithms, this is done by changing a
   156      * node's link to now point to the node itself. This doesn't arise
   157      * much for Stack nodes (because blocked threads do not hang on to
   158      * old head pointers), but references in Queue nodes must be
   159      * aggressively forgotten to avoid reachability of everything any
   160      * node has ever referred to since arrival.
   161      */
   162 
   163     /**
   164      * Shared internal API for dual stacks and queues.
   165      */
   166     abstract static class Transferer {
   167         /**
   168          * Performs a put or take.
   169          *
   170          * @param e if non-null, the item to be handed to a consumer;
   171          *          if null, requests that transfer return an item
   172          *          offered by producer.
   173          * @param timed if this operation should timeout
   174          * @param nanos the timeout, in nanoseconds
   175          * @return if non-null, the item provided or received; if null,
   176          *         the operation failed due to timeout or interrupt --
   177          *         the caller can distinguish which of these occurred
   178          *         by checking Thread.interrupted.
   179          */
   180         abstract Object transfer(Object e, boolean timed, long nanos);
   181     }
   182 
   183     /** The number of CPUs, for spin control */
   184     static final int NCPUS = Runtime.getRuntime().availableProcessors();
   185 
   186     /**
   187      * The number of times to spin before blocking in timed waits.
   188      * The value is empirically derived -- it works well across a
   189      * variety of processors and OSes. Empirically, the best value
   190      * seems not to vary with number of CPUs (beyond 2) so is just
   191      * a constant.
   192      */
   193     static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
   194 
   195     /**
   196      * The number of times to spin before blocking in untimed waits.
   197      * This is greater than timed value because untimed waits spin
   198      * faster since they don't need to check times on each spin.
   199      */
   200     static final int maxUntimedSpins = maxTimedSpins * 16;
   201 
   202     /**
   203      * The number of nanoseconds for which it is faster to spin
   204      * rather than to use timed park. A rough estimate suffices.
   205      */
   206     static final long spinForTimeoutThreshold = 1000L;
   207 
   208     /** Dual stack */
   209     static final class TransferStack extends Transferer {
   210         /*
   211          * This extends Scherer-Scott dual stack algorithm, differing,
   212          * among other ways, by using "covering" nodes rather than
   213          * bit-marked pointers: Fulfilling operations push on marker
   214          * nodes (with FULFILLING bit set in mode) to reserve a spot
   215          * to match a waiting node.
   216          */
   217 
   218         /* Modes for SNodes, ORed together in node fields */
   219         /** Node represents an unfulfilled consumer */
   220         static final int REQUEST    = 0;
   221         /** Node represents an unfulfilled producer */
   222         static final int DATA       = 1;
   223         /** Node is fulfilling another unfulfilled DATA or REQUEST */
   224         static final int FULFILLING = 2;
   225 
   226         /** Return true if m has fulfilling bit set */
   227         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
   228 
   229         /** Node class for TransferStacks. */
   230         static final class SNode {
   231             volatile SNode next;        // next node in stack
   232             volatile SNode match;       // the node matched to this
   233             volatile Thread waiter;     // to control park/unpark
   234             Object item;                // data; or null for REQUESTs
   235             int mode;
   236             // Note: item and mode fields don't need to be volatile
   237             // since they are always written before, and read after,
   238             // other volatile/atomic operations.
   239 
   240             SNode(Object item) {
   241                 this.item = item;
   242             }
   243 
   244             boolean casNext(SNode cmp, SNode val) {
   245                 return cmp == next &&
   246                     UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
   247             }
   248 
   249             /**
   250              * Tries to match node s to this node, if so, waking up thread.
   251              * Fulfillers call tryMatch to identify their waiters.
   252              * Waiters block until they have been matched.
   253              *
   254              * @param s the node to match
   255              * @return true if successfully matched to s
   256              */
   257             boolean tryMatch(SNode s) {
   258                 if (match == null &&
   259                     UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
   260                     Thread w = waiter;
   261                     if (w != null) {    // waiters need at most one unpark
   262                         waiter = null;
   263                         LockSupport.unpark(w);
   264                     }
   265                     return true;
   266                 }
   267                 return match == s;
   268             }
   269 
   270             /**
   271              * Tries to cancel a wait by matching node to itself.
   272              */
   273             void tryCancel() {
   274                 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
   275             }
   276 
   277             boolean isCancelled() {
   278                 return match == this;
   279             }
   280 
   281             // Unsafe mechanics
   282             private static final sun.misc.Unsafe UNSAFE;
   283             private static final long matchOffset;
   284             private static final long nextOffset;
   285 
   286             static {
   287                 try {
   288                     UNSAFE = sun.misc.Unsafe.getUnsafe();
   289                     Class k = SNode.class;
   290                     matchOffset = UNSAFE.objectFieldOffset
   291                         (k.getDeclaredField("match"));
   292                     nextOffset = UNSAFE.objectFieldOffset
   293                         (k.getDeclaredField("next"));
   294                 } catch (Exception e) {
   295                     throw new Error(e);
   296                 }
   297             }
   298         }
   299 
   300         /** The head (top) of the stack */
   301         volatile SNode head;
   302 
   303         boolean casHead(SNode h, SNode nh) {
   304             return h == head &&
   305                 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
   306         }
   307 
   308         /**
   309          * Creates or resets fields of a node. Called only from transfer
   310          * where the node to push on stack is lazily created and
   311          * reused when possible to help reduce intervals between reads
   312          * and CASes of head and to avoid surges of garbage when CASes
   313          * to push nodes fail due to contention.
   314          */
   315         static SNode snode(SNode s, Object e, SNode next, int mode) {
   316             if (s == null) s = new SNode(e);
   317             s.mode = mode;
   318             s.next = next;
   319             return s;
   320         }
   321 
   322         /**
   323          * Puts or takes an item.
   324          */
   325         Object transfer(Object e, boolean timed, long nanos) {
   326             /*
   327              * Basic algorithm is to loop trying one of three actions:
   328              *
   329              * 1. If apparently empty or already containing nodes of same
   330              *    mode, try to push node on stack and wait for a match,
   331              *    returning it, or null if cancelled.
   332              *
   333              * 2. If apparently containing node of complementary mode,
   334              *    try to push a fulfilling node on to stack, match
   335              *    with corresponding waiting node, pop both from
   336              *    stack, and return matched item. The matching or
   337              *    unlinking might not actually be necessary because of
   338              *    other threads performing action 3:
   339              *
   340              * 3. If top of stack already holds another fulfilling node,
   341              *    help it out by doing its match and/or pop
   342              *    operations, and then continue. The code for helping
   343              *    is essentially the same as for fulfilling, except
   344              *    that it doesn't return the item.
   345              */
   346 
   347             SNode s = null; // constructed/reused as needed
   348             int mode = (e == null) ? REQUEST : DATA;
   349 
   350             for (;;) {
   351                 SNode h = head;
   352                 if (h == null || h.mode == mode) {  // empty or same-mode
   353                     if (timed && nanos <= 0) {      // can't wait
   354                         if (h != null && h.isCancelled())
   355                             casHead(h, h.next);     // pop cancelled node
   356                         else
   357                             return null;
   358                     } else if (casHead(h, s = snode(s, e, h, mode))) {
   359                         SNode m = awaitFulfill(s, timed, nanos);
   360                         if (m == s) {               // wait was cancelled
   361                             clean(s);
   362                             return null;
   363                         }
   364                         if ((h = head) != null && h.next == s)
   365                             casHead(h, s.next);     // help s's fulfiller
   366                         return (mode == REQUEST) ? m.item : s.item;
   367                     }
   368                 } else if (!isFulfilling(h.mode)) { // try to fulfill
   369                     if (h.isCancelled())            // already cancelled
   370                         casHead(h, h.next);         // pop and retry
   371                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
   372                         for (;;) { // loop until matched or waiters disappear
   373                             SNode m = s.next;       // m is s's match
   374                             if (m == null) {        // all waiters are gone
   375                                 casHead(s, null);   // pop fulfill node
   376                                 s = null;           // use new node next time
   377                                 break;              // restart main loop
   378                             }
   379                             SNode mn = m.next;
   380                             if (m.tryMatch(s)) {
   381                                 casHead(s, mn);     // pop both s and m
   382                                 return (mode == REQUEST) ? m.item : s.item;
   383                             } else                  // lost match
   384                                 s.casNext(m, mn);   // help unlink
   385                         }
   386                     }
   387                 } else {                            // help a fulfiller
   388                     SNode m = h.next;               // m is h's match
   389                     if (m == null)                  // waiter is gone
   390                         casHead(h, null);           // pop fulfilling node
   391                     else {
   392                         SNode mn = m.next;
   393                         if (m.tryMatch(h))          // help match
   394                             casHead(h, mn);         // pop both h and m
   395                         else                        // lost match
   396                             h.casNext(m, mn);       // help unlink
   397                     }
   398                 }
   399             }
   400         }
   401 
   402         /**
   403          * Spins/blocks until node s is matched by a fulfill operation.
   404          *
   405          * @param s the waiting node
   406          * @param timed true if timed wait
   407          * @param nanos timeout value
   408          * @return matched node, or s if cancelled
   409          */
   410         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
   411             /*
   412              * When a node/thread is about to block, it sets its waiter
   413              * field and then rechecks state at least one more time
   414              * before actually parking, thus covering race vs
   415              * fulfiller noticing that waiter is non-null so should be
   416              * woken.
   417              *
   418              * When invoked by nodes that appear at the point of call
   419              * to be at the head of the stack, calls to park are
   420              * preceded by spins to avoid blocking when producers and
   421              * consumers are arriving very close in time.  This can
   422              * happen enough to bother only on multiprocessors.
   423              *
   424              * The order of checks for returning out of main loop
   425              * reflects fact that interrupts have precedence over
   426              * normal returns, which have precedence over
   427              * timeouts. (So, on timeout, one last check for match is
   428              * done before giving up.) Except that calls from untimed
   429              * SynchronousQueue.{poll/offer} don't check interrupts
   430              * and don't wait at all, so are trapped in transfer
   431              * method rather than calling awaitFulfill.
   432              */
   433             long lastTime = timed ? System.nanoTime() : 0;
   434             Thread w = Thread.currentThread();
   435             SNode h = head;
   436             int spins = (shouldSpin(s) ?
   437                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
   438             for (;;) {
   439                 if (w.isInterrupted())
   440                     s.tryCancel();
   441                 SNode m = s.match;
   442                 if (m != null)
   443                     return m;
   444                 if (timed) {
   445                     long now = System.nanoTime();
   446                     nanos -= now - lastTime;
   447                     lastTime = now;
   448                     if (nanos <= 0) {
   449                         s.tryCancel();
   450                         continue;
   451                     }
   452                 }
   453                 if (spins > 0)
   454                     spins = shouldSpin(s) ? (spins-1) : 0;
   455                 else if (s.waiter == null)
   456                     s.waiter = w; // establish waiter so can park next iter
   457                 else if (!timed)
   458                     LockSupport.park(this);
   459                 else if (nanos > spinForTimeoutThreshold)
   460                     LockSupport.parkNanos(this, nanos);
   461             }
   462         }
   463 
   464         /**
   465          * Returns true if node s is at head or there is an active
   466          * fulfiller.
   467          */
   468         boolean shouldSpin(SNode s) {
   469             SNode h = head;
   470             return (h == s || h == null || isFulfilling(h.mode));
   471         }
   472 
   473         /**
   474          * Unlinks s from the stack.
   475          */
   476         void clean(SNode s) {
   477             s.item = null;   // forget item
   478             s.waiter = null; // forget thread
   479 
   480             /*
   481              * At worst we may need to traverse entire stack to unlink
   482              * s. If there are multiple concurrent calls to clean, we
   483              * might not see s if another thread has already removed
   484              * it. But we can stop when we see any node known to
   485              * follow s. We use s.next unless it too is cancelled, in
   486              * which case we try the node one past. We don't check any
   487              * further because we don't want to doubly traverse just to
   488              * find sentinel.
   489              */
   490 
   491             SNode past = s.next;
   492             if (past != null && past.isCancelled())
   493                 past = past.next;
   494 
   495             // Absorb cancelled nodes at head
   496             SNode p;
   497             while ((p = head) != null && p != past && p.isCancelled())
   498                 casHead(p, p.next);
   499 
   500             // Unsplice embedded nodes
   501             while (p != null && p != past) {
   502                 SNode n = p.next;
   503                 if (n != null && n.isCancelled())
   504                     p.casNext(n, n.next);
   505                 else
   506                     p = n;
   507             }
   508         }
   509 
   510         // Unsafe mechanics
   511         private static final sun.misc.Unsafe UNSAFE;
   512         private static final long headOffset;
   513         static {
   514             try {
   515                 UNSAFE = sun.misc.Unsafe.getUnsafe();
   516                 Class k = TransferStack.class;
   517                 headOffset = UNSAFE.objectFieldOffset
   518                     (k.getDeclaredField("head"));
   519             } catch (Exception e) {
   520                 throw new Error(e);
   521             }
   522         }
   523     }
   524 
   525     /** Dual Queue */
   526     static final class TransferQueue extends Transferer {
   527         /*
   528          * This extends Scherer-Scott dual queue algorithm, differing,
   529          * among other ways, by using modes within nodes rather than
   530          * marked pointers. The algorithm is a little simpler than
   531          * that for stacks because fulfillers do not need explicit
   532          * nodes, and matching is done by CAS'ing QNode.item field
   533          * from non-null to null (for put) or vice versa (for take).
   534          */
   535 
   536         /** Node class for TransferQueue. */
   537         static final class QNode {
   538             volatile QNode next;          // next node in queue
   539             volatile Object item;         // CAS'ed to or from null
   540             volatile Thread waiter;       // to control park/unpark
   541             final boolean isData;
   542 
   543             QNode(Object item, boolean isData) {
   544                 this.item = item;
   545                 this.isData = isData;
   546             }
   547 
   548             boolean casNext(QNode cmp, QNode val) {
   549                 return next == cmp &&
   550                     UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
   551             }
   552 
   553             boolean casItem(Object cmp, Object val) {
   554                 return item == cmp &&
   555                     UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
   556             }
   557 
   558             /**
   559              * Tries to cancel by CAS'ing ref to this as item.
   560              */
   561             void tryCancel(Object cmp) {
   562                 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
   563             }
   564 
   565             boolean isCancelled() {
   566                 return item == this;
   567             }
   568 
   569             /**
   570              * Returns true if this node is known to be off the queue
   571              * because its next pointer has been forgotten due to
   572              * an advanceHead operation.
   573              */
   574             boolean isOffList() {
   575                 return next == this;
   576             }
   577 
   578             // Unsafe mechanics
   579             private static final sun.misc.Unsafe UNSAFE;
   580             private static final long itemOffset;
   581             private static final long nextOffset;
   582 
   583             static {
   584                 try {
   585                     UNSAFE = sun.misc.Unsafe.getUnsafe();
   586                     Class k = QNode.class;
   587                     itemOffset = UNSAFE.objectFieldOffset
   588                         (k.getDeclaredField("item"));
   589                     nextOffset = UNSAFE.objectFieldOffset
   590                         (k.getDeclaredField("next"));
   591                 } catch (Exception e) {
   592                     throw new Error(e);
   593                 }
   594             }
   595         }
   596 
   597         /** Head of queue */
   598         transient volatile QNode head;
   599         /** Tail of queue */
   600         transient volatile QNode tail;
   601         /**
   602          * Reference to a cancelled node that might not yet have been
   603          * unlinked from queue because it was the last inserted node
   604          * when it cancelled.
   605          */
   606         transient volatile QNode cleanMe;
   607 
   608         TransferQueue() {
   609             QNode h = new QNode(null, false); // initialize to dummy node.
   610             head = h;
   611             tail = h;
   612         }
   613 
   614         /**
   615          * Tries to cas nh as new head; if successful, unlink
   616          * old head's next node to avoid garbage retention.
   617          */
   618         void advanceHead(QNode h, QNode nh) {
   619             if (h == head &&
   620                 UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
   621                 h.next = h; // forget old next
   622         }
   623 
   624         /**
   625          * Tries to cas nt as new tail.
   626          */
   627         void advanceTail(QNode t, QNode nt) {
   628             if (tail == t)
   629                 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
   630         }
   631 
   632         /**
   633          * Tries to CAS cleanMe slot.
   634          */
   635         boolean casCleanMe(QNode cmp, QNode val) {
   636             return cleanMe == cmp &&
   637                 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
   638         }
   639 
   640         /**
   641          * Puts or takes an item.
   642          */
   643         Object transfer(Object e, boolean timed, long nanos) {
   644             /* Basic algorithm is to loop trying to take either of
   645              * two actions:
   646              *
   647              * 1. If queue apparently empty or holding same-mode nodes,
   648              *    try to add node to queue of waiters, wait to be
   649              *    fulfilled (or cancelled) and return matching item.
   650              *
   651              * 2. If queue apparently contains waiting items, and this
   652              *    call is of complementary mode, try to fulfill by CAS'ing
   653              *    item field of waiting node and dequeuing it, and then
   654              *    returning matching item.
   655              *
   656              * In each case, along the way, check for and try to help
   657              * advance head and tail on behalf of other stalled/slow
   658              * threads.
   659              *
   660              * The loop starts off with a null check guarding against
   661              * seeing uninitialized head or tail values. This never
   662              * happens in current SynchronousQueue, but could if
   663              * callers held non-volatile/final ref to the
   664              * transferer. The check is here anyway because it places
   665              * null checks at top of loop, which is usually faster
   666              * than having them implicitly interspersed.
   667              */
   668 
   669             QNode s = null; // constructed/reused as needed
   670             boolean isData = (e != null);
   671 
   672             for (;;) {
   673                 QNode t = tail;
   674                 QNode h = head;
   675                 if (t == null || h == null)         // saw uninitialized value
   676                     continue;                       // spin
   677 
   678                 if (h == t || t.isData == isData) { // empty or same-mode
   679                     QNode tn = t.next;
   680                     if (t != tail)                  // inconsistent read
   681                         continue;
   682                     if (tn != null) {               // lagging tail
   683                         advanceTail(t, tn);
   684                         continue;
   685                     }
   686                     if (timed && nanos <= 0)        // can't wait
   687                         return null;
   688                     if (s == null)
   689                         s = new QNode(e, isData);
   690                     if (!t.casNext(null, s))        // failed to link in
   691                         continue;
   692 
   693                     advanceTail(t, s);              // swing tail and wait
   694                     Object x = awaitFulfill(s, e, timed, nanos);
   695                     if (x == s) {                   // wait was cancelled
   696                         clean(t, s);
   697                         return null;
   698                     }
   699 
   700                     if (!s.isOffList()) {           // not already unlinked
   701                         advanceHead(t, s);          // unlink if head
   702                         if (x != null)              // and forget fields
   703                             s.item = s;
   704                         s.waiter = null;
   705                     }
   706                     return (x != null) ? x : e;
   707 
   708                 } else {                            // complementary-mode
   709                     QNode m = h.next;               // node to fulfill
   710                     if (t != tail || m == null || h != head)
   711                         continue;                   // inconsistent read
   712 
   713                     Object x = m.item;
   714                     if (isData == (x != null) ||    // m already fulfilled
   715                         x == m ||                   // m cancelled
   716                         !m.casItem(x, e)) {         // lost CAS
   717                         advanceHead(h, m);          // dequeue and retry
   718                         continue;
   719                     }
   720 
   721                     advanceHead(h, m);              // successfully fulfilled
   722                     LockSupport.unpark(m.waiter);
   723                     return (x != null) ? x : e;
   724                 }
   725             }
   726         }
   727 
   728         /**
   729          * Spins/blocks until node s is fulfilled.
   730          *
   731          * @param s the waiting node
   732          * @param e the comparison value for checking match
   733          * @param timed true if timed wait
   734          * @param nanos timeout value
   735          * @return matched item, or s if cancelled
   736          */
   737         Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
   738             /* Same idea as TransferStack.awaitFulfill */
   739             long lastTime = timed ? System.nanoTime() : 0;
   740             Thread w = Thread.currentThread();
   741             int spins = ((head.next == s) ?
   742                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
   743             for (;;) {
   744                 if (w.isInterrupted())
   745                     s.tryCancel(e);
   746                 Object x = s.item;
   747                 if (x != e)
   748                     return x;
   749                 if (timed) {
   750                     long now = System.nanoTime();
   751                     nanos -= now - lastTime;
   752                     lastTime = now;
   753                     if (nanos <= 0) {
   754                         s.tryCancel(e);
   755                         continue;
   756                     }
   757                 }
   758                 if (spins > 0)
   759                     --spins;
   760                 else if (s.waiter == null)
   761                     s.waiter = w;
   762                 else if (!timed)
   763                     LockSupport.park(this);
   764                 else if (nanos > spinForTimeoutThreshold)
   765                     LockSupport.parkNanos(this, nanos);
   766             }
   767         }
   768 
   769         /**
   770          * Gets rid of cancelled node s with original predecessor pred.
   771          */
   772         void clean(QNode pred, QNode s) {
   773             s.waiter = null; // forget thread
   774             /*
   775              * At any given time, exactly one node on list cannot be
   776              * deleted -- the last inserted node. To accommodate this,
   777              * if we cannot delete s, we save its predecessor as
   778              * "cleanMe", deleting the previously saved version
   779              * first. At least one of node s or the node previously
   780              * saved can always be deleted, so this always terminates.
   781              */
   782             while (pred.next == s) { // Return early if already unlinked
   783                 QNode h = head;
   784                 QNode hn = h.next;   // Absorb cancelled first node as head
   785                 if (hn != null && hn.isCancelled()) {
   786                     advanceHead(h, hn);
   787                     continue;
   788                 }
   789                 QNode t = tail;      // Ensure consistent read for tail
   790                 if (t == h)
   791                     return;
   792                 QNode tn = t.next;
   793                 if (t != tail)
   794                     continue;
   795                 if (tn != null) {
   796                     advanceTail(t, tn);
   797                     continue;
   798                 }
   799                 if (s != t) {        // If not tail, try to unsplice
   800                     QNode sn = s.next;
   801                     if (sn == s || pred.casNext(s, sn))
   802                         return;
   803                 }
   804                 QNode dp = cleanMe;
   805                 if (dp != null) {    // Try unlinking previous cancelled node
   806                     QNode d = dp.next;
   807                     QNode dn;
   808                     if (d == null ||               // d is gone or
   809                         d == dp ||                 // d is off list or
   810                         !d.isCancelled() ||        // d not cancelled or
   811                         (d != t &&                 // d not tail and
   812                          (dn = d.next) != null &&  //   has successor
   813                          dn != d &&                //   that is on list
   814                          dp.casNext(d, dn)))       // d unspliced
   815                         casCleanMe(dp, null);
   816                     if (dp == pred)
   817                         return;      // s is already saved node
   818                 } else if (casCleanMe(null, pred))
   819                     return;          // Postpone cleaning s
   820             }
   821         }
   822 
   823         private static final sun.misc.Unsafe UNSAFE;
   824         private static final long headOffset;
   825         private static final long tailOffset;
   826         private static final long cleanMeOffset;
   827         static {
   828             try {
   829                 UNSAFE = sun.misc.Unsafe.getUnsafe();
   830                 Class k = TransferQueue.class;
   831                 headOffset = UNSAFE.objectFieldOffset
   832                     (k.getDeclaredField("head"));
   833                 tailOffset = UNSAFE.objectFieldOffset
   834                     (k.getDeclaredField("tail"));
   835                 cleanMeOffset = UNSAFE.objectFieldOffset
   836                     (k.getDeclaredField("cleanMe"));
   837             } catch (Exception e) {
   838                 throw new Error(e);
   839             }
   840         }
   841     }
   842 
   843     /**
   844      * The transferer. Set only in constructor, but cannot be declared
   845      * as final without further complicating serialization.  Since
   846      * this is accessed only at most once per public method, there
   847      * isn't a noticeable performance penalty for using volatile
   848      * instead of final here.
   849      */
   850     private transient volatile Transferer transferer;
   851 
   852     /**
   853      * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
   854      */
   855     public SynchronousQueue() {
   856         this(false);
   857     }
   858 
   859     /**
   860      * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
   861      *
   862      * @param fair if true, waiting threads contend in FIFO order for
   863      *        access; otherwise the order is unspecified.
   864      */
   865     public SynchronousQueue(boolean fair) {
   866         transferer = fair ? new TransferQueue() : new TransferStack();
   867     }
   868 
   869     /**
   870      * Adds the specified element to this queue, waiting if necessary for
   871      * another thread to receive it.
   872      *
   873      * @throws InterruptedException {@inheritDoc}
   874      * @throws NullPointerException {@inheritDoc}
   875      */
   876     public void put(E o) throws InterruptedException {
   877         if (o == null) throw new NullPointerException();
   878         if (transferer.transfer(o, false, 0) == null) {
   879             Thread.interrupted();
   880             throw new InterruptedException();
   881         }
   882     }
   883 
   884     /**
   885      * Inserts the specified element into this queue, waiting if necessary
   886      * up to the specified wait time for another thread to receive it.
   887      *
   888      * @return <tt>true</tt> if successful, or <tt>false</tt> if the
   889      *         specified waiting time elapses before a consumer appears.
   890      * @throws InterruptedException {@inheritDoc}
   891      * @throws NullPointerException {@inheritDoc}
   892      */
   893     public boolean offer(E o, long timeout, TimeUnit unit)
   894         throws InterruptedException {
   895         if (o == null) throw new NullPointerException();
   896         if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
   897             return true;
   898         if (!Thread.interrupted())
   899             return false;
   900         throw new InterruptedException();
   901     }
   902 
   903     /**
   904      * Inserts the specified element into this queue, if another thread is
   905      * waiting to receive it.
   906      *
   907      * @param e the element to add
   908      * @return <tt>true</tt> if the element was added to this queue, else
   909      *         <tt>false</tt>
   910      * @throws NullPointerException if the specified element is null
   911      */
   912     public boolean offer(E e) {
   913         if (e == null) throw new NullPointerException();
   914         return transferer.transfer(e, true, 0) != null;
   915     }
   916 
   917     /**
   918      * Retrieves and removes the head of this queue, waiting if necessary
   919      * for another thread to insert it.
   920      *
   921      * @return the head of this queue
   922      * @throws InterruptedException {@inheritDoc}
   923      */
   924     public E take() throws InterruptedException {
   925         Object e = transferer.transfer(null, false, 0);
   926         if (e != null)
   927             return (E)e;
   928         Thread.interrupted();
   929         throw new InterruptedException();
   930     }
   931 
   932     /**
   933      * Retrieves and removes the head of this queue, waiting
   934      * if necessary up to the specified wait time, for another thread
   935      * to insert it.
   936      *
   937      * @return the head of this queue, or <tt>null</tt> if the
   938      *         specified waiting time elapses before an element is present.
   939      * @throws InterruptedException {@inheritDoc}
   940      */
   941     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   942         Object e = transferer.transfer(null, true, unit.toNanos(timeout));
   943         if (e != null || !Thread.interrupted())
   944             return (E)e;
   945         throw new InterruptedException();
   946     }
   947 
   948     /**
   949      * Retrieves and removes the head of this queue, if another thread
   950      * is currently making an element available.
   951      *
   952      * @return the head of this queue, or <tt>null</tt> if no
   953      *         element is available.
   954      */
   955     public E poll() {
   956         return (E)transferer.transfer(null, true, 0);
   957     }
   958 
   959     /**
   960      * Always returns <tt>true</tt>.
   961      * A <tt>SynchronousQueue</tt> has no internal capacity.
   962      *
   963      * @return <tt>true</tt>
   964      */
   965     public boolean isEmpty() {
   966         return true;
   967     }
   968 
   969     /**
   970      * Always returns zero.
   971      * A <tt>SynchronousQueue</tt> has no internal capacity.
   972      *
   973      * @return zero.
   974      */
   975     public int size() {
   976         return 0;
   977     }
   978 
   979     /**
   980      * Always returns zero.
   981      * A <tt>SynchronousQueue</tt> has no internal capacity.
   982      *
   983      * @return zero.
   984      */
   985     public int remainingCapacity() {
   986         return 0;
   987     }
   988 
   989     /**
   990      * Does nothing.
   991      * A <tt>SynchronousQueue</tt> has no internal capacity.
   992      */
   993     public void clear() {
   994     }
   995 
   996     /**
   997      * Always returns <tt>false</tt>.
   998      * A <tt>SynchronousQueue</tt> has no internal capacity.
   999      *
  1000      * @param o the element
  1001      * @return <tt>false</tt>
  1002      */
  1003     public boolean contains(Object o) {
  1004         return false;
  1005     }
  1006 
  1007     /**
  1008      * Always returns <tt>false</tt>.
  1009      * A <tt>SynchronousQueue</tt> has no internal capacity.
  1010      *
  1011      * @param o the element to remove
  1012      * @return <tt>false</tt>
  1013      */
  1014     public boolean remove(Object o) {
  1015         return false;
  1016     }
  1017 
  1018     /**
  1019      * Returns <tt>false</tt> unless the given collection is empty.
  1020      * A <tt>SynchronousQueue</tt> has no internal capacity.
  1021      *
  1022      * @param c the collection
  1023      * @return <tt>false</tt> unless given collection is empty
  1024      */
  1025     public boolean containsAll(Collection<?> c) {
  1026         return c.isEmpty();
  1027     }
  1028 
  1029     /**
  1030      * Always returns <tt>false</tt>.
  1031      * A <tt>SynchronousQueue</tt> has no internal capacity.
  1032      *
  1033      * @param c the collection
  1034      * @return <tt>false</tt>
  1035      */
  1036     public boolean removeAll(Collection<?> c) {
  1037         return false;
  1038     }
  1039 
  1040     /**
  1041      * Always returns <tt>false</tt>.
  1042      * A <tt>SynchronousQueue</tt> has no internal capacity.
  1043      *
  1044      * @param c the collection
  1045      * @return <tt>false</tt>
  1046      */
  1047     public boolean retainAll(Collection<?> c) {
  1048         return false;
  1049     }
  1050 
  1051     /**
  1052      * Always returns <tt>null</tt>.
  1053      * A <tt>SynchronousQueue</tt> does not return elements
  1054      * unless actively waited on.
  1055      *
  1056      * @return <tt>null</tt>
  1057      */
  1058     public E peek() {
  1059         return null;
  1060     }
  1061 
  1062     /**
  1063      * Returns an empty iterator in which <tt>hasNext</tt> always returns
  1064      * <tt>false</tt>.
  1065      *
  1066      * @return an empty iterator
  1067      */
  1068     public Iterator<E> iterator() {
  1069         return Collections.emptyIterator();
  1070     }
  1071 
  1072     /**
  1073      * Returns a zero-length array.
  1074      * @return a zero-length array
  1075      */
  1076     public Object[] toArray() {
  1077         return new Object[0];
  1078     }
  1079 
  1080     /**
  1081      * Sets the zeroeth element of the specified array to <tt>null</tt>
  1082      * (if the array has non-zero length) and returns it.
  1083      *
  1084      * @param a the array
  1085      * @return the specified array
  1086      * @throws NullPointerException if the specified array is null
  1087      */
  1088     public <T> T[] toArray(T[] a) {
  1089         if (a.length > 0)
  1090             a[0] = null;
  1091         return a;
  1092     }
  1093 
  1094     /**
  1095      * @throws UnsupportedOperationException {@inheritDoc}
  1096      * @throws ClassCastException            {@inheritDoc}
  1097      * @throws NullPointerException          {@inheritDoc}
  1098      * @throws IllegalArgumentException      {@inheritDoc}
  1099      */
  1100     public int drainTo(Collection<? super E> c) {
  1101         if (c == null)
  1102             throw new NullPointerException();
  1103         if (c == this)
  1104             throw new IllegalArgumentException();
  1105         int n = 0;
  1106         E e;
  1107         while ( (e = poll()) != null) {
  1108             c.add(e);
  1109             ++n;
  1110         }
  1111         return n;
  1112     }
  1113 
  1114     /**
  1115      * @throws UnsupportedOperationException {@inheritDoc}
  1116      * @throws ClassCastException            {@inheritDoc}
  1117      * @throws NullPointerException          {@inheritDoc}
  1118      * @throws IllegalArgumentException      {@inheritDoc}
  1119      */
  1120     public int drainTo(Collection<? super E> c, int maxElements) {
  1121         if (c == null)
  1122             throw new NullPointerException();
  1123         if (c == this)
  1124             throw new IllegalArgumentException();
  1125         int n = 0;
  1126         E e;
  1127         while (n < maxElements && (e = poll()) != null) {
  1128             c.add(e);
  1129             ++n;
  1130         }
  1131         return n;
  1132     }
  1133 
  1134     /*
  1135      * To cope with serialization strategy in the 1.5 version of
  1136      * SynchronousQueue, we declare some unused classes and fields
  1137      * that exist solely to enable serializability across versions.
  1138      * These fields are never used, so are initialized only if this
  1139      * object is ever serialized or deserialized.
  1140      */
  1141 
  1142     static class WaitQueue implements java.io.Serializable { }
  1143     static class LifoWaitQueue extends WaitQueue {
  1144         private static final long serialVersionUID = -3633113410248163686L;
  1145     }
  1146     static class FifoWaitQueue extends WaitQueue {
  1147         private static final long serialVersionUID = -3623113410248163686L;
  1148     }
  1149     private ReentrantLock qlock;
  1150     private WaitQueue waitingProducers;
  1151     private WaitQueue waitingConsumers;
  1152 
  1153     /**
  1154      * Save the state to a stream (that is, serialize it).
  1155      *
  1156      * @param s the stream
  1157      */
  1158     private void writeObject(java.io.ObjectOutputStream s)
  1159         throws java.io.IOException {
  1160         boolean fair = transferer instanceof TransferQueue;
  1161         if (fair) {
  1162             qlock = new ReentrantLock(true);
  1163             waitingProducers = new FifoWaitQueue();
  1164             waitingConsumers = new FifoWaitQueue();
  1165         }
  1166         else {
  1167             qlock = new ReentrantLock();
  1168             waitingProducers = new LifoWaitQueue();
  1169             waitingConsumers = new LifoWaitQueue();
  1170         }
  1171         s.defaultWriteObject();
  1172     }
  1173 
  1174     private void readObject(final java.io.ObjectInputStream s)
  1175         throws java.io.IOException, ClassNotFoundException {
  1176         s.defaultReadObject();
  1177         if (waitingProducers instanceof FifoWaitQueue)
  1178             transferer = new TransferQueue();
  1179         else
  1180             transferer = new TransferStack();
  1181     }
  1182 
  1183     // Unsafe mechanics
  1184     static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
  1185                                   String field, Class<?> klazz) {
  1186         try {
  1187             return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
  1188         } catch (NoSuchFieldException e) {
  1189             // Convert Exception to corresponding Error
  1190             NoSuchFieldError error = new NoSuchFieldError(field);
  1191             error.initCause(e);
  1192             throw error;
  1193         }
  1194     }
  1195 
  1196 }