rt/emul/compact/src/main/java/java/util/concurrent/SynchronousQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 12:51:03 +0100
changeset 1895 bfaf3300b7ba
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent package compilable except ForkJoinPool
jaroslav@1890
     1
/*
jaroslav@1890
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
jaroslav@1890
     3
 *
jaroslav@1890
     4
 * This code is free software; you can redistribute it and/or modify it
jaroslav@1890
     5
 * under the terms of the GNU General Public License version 2 only, as
jaroslav@1890
     6
 * published by the Free Software Foundation.  Oracle designates this
jaroslav@1890
     7
 * particular file as subject to the "Classpath" exception as provided
jaroslav@1890
     8
 * by Oracle in the LICENSE file that accompanied this code.
jaroslav@1890
     9
 *
jaroslav@1890
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
jaroslav@1890
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
jaroslav@1890
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
jaroslav@1890
    13
 * version 2 for more details (a copy is included in the LICENSE file that
jaroslav@1890
    14
 * accompanied this code).
jaroslav@1890
    15
 *
jaroslav@1890
    16
 * You should have received a copy of the GNU General Public License version
jaroslav@1890
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
jaroslav@1890
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
jaroslav@1890
    19
 *
jaroslav@1890
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
jaroslav@1890
    21
 * or visit www.oracle.com if you need additional information or have any
jaroslav@1890
    22
 * questions.
jaroslav@1890
    23
 */
jaroslav@1890
    24
jaroslav@1890
    25
/*
jaroslav@1890
    26
 * This file is available under and governed by the GNU General Public
jaroslav@1890
    27
 * License version 2 only, as published by the Free Software Foundation.
jaroslav@1890
    28
 * However, the following notice accompanied the original version of this
jaroslav@1890
    29
 * file:
jaroslav@1890
    30
 *
jaroslav@1890
    31
 * Written by Doug Lea, Bill Scherer, and Michael Scott with
jaroslav@1890
    32
 * assistance from members of JCP JSR-166 Expert Group and released to
jaroslav@1890
    33
 * the public domain, as explained at
jaroslav@1890
    34
 * http://creativecommons.org/publicdomain/zero/1.0/
jaroslav@1890
    35
 */
jaroslav@1890
    36
jaroslav@1890
    37
package java.util.concurrent;
jaroslav@1890
    38
import java.util.concurrent.locks.*;
jaroslav@1890
    39
import java.util.*;
jaroslav@1890
    40
jaroslav@1890
    41
/**
jaroslav@1890
    42
 * A {@linkplain BlockingQueue blocking queue} in which each insert
jaroslav@1890
    43
 * operation must wait for a corresponding remove operation by another
jaroslav@1890
    44
 * thread, and vice versa.  A synchronous queue does not have any
jaroslav@1890
    45
 * internal capacity, not even a capacity of one.  You cannot
jaroslav@1890
    46
 * <tt>peek</tt> at a synchronous queue because an element is only
jaroslav@1890
    47
 * present when you try to remove it; you cannot insert an element
jaroslav@1890
    48
 * (using any method) unless another thread is trying to remove it;
jaroslav@1890
    49
 * you cannot iterate as there is nothing to iterate.  The
jaroslav@1890
    50
 * <em>head</em> of the queue is the element that the first queued
jaroslav@1890
    51
 * inserting thread is trying to add to the queue; if there is no such
jaroslav@1890
    52
 * queued thread then no element is available for removal and
jaroslav@1890
    53
 * <tt>poll()</tt> will return <tt>null</tt>.  For purposes of other
jaroslav@1890
    54
 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
jaroslav@1890
    55
 * <tt>SynchronousQueue</tt> acts as an empty collection.  This queue
jaroslav@1890
    56
 * does not permit <tt>null</tt> elements.
jaroslav@1890
    57
 *
jaroslav@1890
    58
 * <p>Synchronous queues are similar to rendezvous channels used in
jaroslav@1890
    59
 * CSP and Ada. They are well suited for handoff designs, in which an
jaroslav@1890
    60
 * object running in one thread must sync up with an object running
jaroslav@1890
    61
 * in another thread in order to hand it some information, event, or
jaroslav@1890
    62
 * task.
jaroslav@1890
    63
 *
jaroslav@1890
    64
 * <p> This class supports an optional fairness policy for ordering
jaroslav@1890
    65
 * waiting producer and consumer threads.  By default, this ordering
jaroslav@1890
    66
 * is not guaranteed. However, a queue constructed with fairness set
jaroslav@1890
    67
 * to <tt>true</tt> grants threads access in FIFO order.
jaroslav@1890
    68
 *
jaroslav@1890
    69
 * <p>This class and its iterator implement all of the
jaroslav@1890
    70
 * <em>optional</em> methods of the {@link Collection} and {@link
jaroslav@1890
    71
 * Iterator} interfaces.
jaroslav@1890
    72
 *
jaroslav@1890
    73
 * <p>This class is a member of the
jaroslav@1890
    74
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
jaroslav@1890
    75
 * Java Collections Framework</a>.
jaroslav@1890
    76
 *
jaroslav@1890
    77
 * @since 1.5
jaroslav@1890
    78
 * @author Doug Lea and Bill Scherer and Michael Scott
jaroslav@1890
    79
 * @param <E> the type of elements held in this collection
jaroslav@1890
    80
 */
jaroslav@1890
    81
public class SynchronousQueue<E> extends AbstractQueue<E>
jaroslav@1890
    82
    implements BlockingQueue<E>, java.io.Serializable {
jaroslav@1890
    83
    private static final long serialVersionUID = -3223113410248163686L;
jaroslav@1890
    84
jaroslav@1890
    85
    /*
jaroslav@1890
    86
     * This class implements extensions of the dual stack and dual
jaroslav@1890
    87
     * queue algorithms described in "Nonblocking Concurrent Objects
jaroslav@1890
    88
     * with Condition Synchronization", by W. N. Scherer III and
jaroslav@1890
    89
     * M. L. Scott.  18th Annual Conf. on Distributed Computing,
jaroslav@1890
    90
     * Oct. 2004 (see also
jaroslav@1890
    91
     * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
jaroslav@1890
    92
     * The (Lifo) stack is used for non-fair mode, and the (Fifo)
jaroslav@1890
    93
     * queue for fair mode. The performance of the two is generally
jaroslav@1890
    94
     * similar. Fifo usually supports higher throughput under
jaroslav@1890
    95
     * contention but Lifo maintains higher thread locality in common
jaroslav@1890
    96
     * applications.
jaroslav@1890
    97
     *
jaroslav@1890
    98
     * A dual queue (and similarly stack) is one that at any given
jaroslav@1890
    99
     * time either holds "data" -- items provided by put operations,
jaroslav@1890
   100
     * or "requests" -- slots representing take operations, or is
jaroslav@1890
   101
     * empty. A call to "fulfill" (i.e., a call requesting an item
jaroslav@1890
   102
     * from a queue holding data or vice versa) dequeues a
jaroslav@1890
   103
     * complementary node.  The most interesting feature of these
jaroslav@1890
   104
     * queues is that any operation can figure out which mode the
jaroslav@1890
   105
     * queue is in, and act accordingly without needing locks.
jaroslav@1890
   106
     *
jaroslav@1890
   107
     * Both the queue and stack extend abstract class Transferer
jaroslav@1890
   108
     * defining the single method transfer that does a put or a
jaroslav@1890
   109
     * take. These are unified into a single method because in dual
jaroslav@1890
   110
     * data structures, the put and take operations are symmetrical,
jaroslav@1890
   111
     * so nearly all code can be combined. The resulting transfer
jaroslav@1890
   112
     * methods are on the long side, but are easier to follow than
jaroslav@1890
   113
     * they would be if broken up into nearly-duplicated parts.
jaroslav@1890
   114
     *
jaroslav@1890
   115
     * The queue and stack data structures share many conceptual
jaroslav@1890
   116
     * similarities but very few concrete details. For simplicity,
jaroslav@1890
   117
     * they are kept distinct so that they can later evolve
jaroslav@1890
   118
     * separately.
jaroslav@1890
   119
     *
jaroslav@1890
   120
     * The algorithms here differ from the versions in the above paper
jaroslav@1890
   121
     * in extending them for use in synchronous queues, as well as
jaroslav@1890
   122
     * dealing with cancellation. The main differences include:
jaroslav@1890
   123
     *
jaroslav@1890
   124
     *  1. The original algorithms used bit-marked pointers, but
jaroslav@1890
   125
     *     the ones here use mode bits in nodes, leading to a number
jaroslav@1890
   126
     *     of further adaptations.
jaroslav@1890
   127
     *  2. SynchronousQueues must block threads waiting to become
jaroslav@1890
   128
     *     fulfilled.
jaroslav@1890
   129
     *  3. Support for cancellation via timeout and interrupts,
jaroslav@1890
   130
     *     including cleaning out cancelled nodes/threads
jaroslav@1890
   131
     *     from lists to avoid garbage retention and memory depletion.
jaroslav@1890
   132
     *
jaroslav@1890
   133
     * Blocking is mainly accomplished using LockSupport park/unpark,
jaroslav@1890
   134
     * except that nodes that appear to be the next ones to become
jaroslav@1890
   135
     * fulfilled first spin a bit (on multiprocessors only). On very
jaroslav@1890
   136
     * busy synchronous queues, spinning can dramatically improve
jaroslav@1890
   137
     * throughput. And on less busy ones, the amount of spinning is
jaroslav@1890
   138
     * small enough not to be noticeable.
jaroslav@1890
   139
     *
jaroslav@1890
   140
     * Cleaning is done in different ways in queues vs stacks.  For
jaroslav@1890
   141
     * queues, we can almost always remove a node immediately in O(1)
jaroslav@1890
   142
     * time (modulo retries for consistency checks) when it is
jaroslav@1890
   143
     * cancelled. But if it may be pinned as the current tail, it must
jaroslav@1890
   144
     * wait until some subsequent cancellation. For stacks, we need a
jaroslav@1890
   145
     * potentially O(n) traversal to be sure that we can remove the
jaroslav@1890
   146
     * node, but this can run concurrently with other threads
jaroslav@1890
   147
     * accessing the stack.
jaroslav@1890
   148
     *
jaroslav@1890
   149
     * While garbage collection takes care of most node reclamation
jaroslav@1890
   150
     * issues that otherwise complicate nonblocking algorithms, care
jaroslav@1890
   151
     * is taken to "forget" references to data, other nodes, and
jaroslav@1890
   152
     * threads that might be held on to long-term by blocked
jaroslav@1890
   153
     * threads. In cases where setting to null would otherwise
jaroslav@1890
   154
     * conflict with main algorithms, this is done by changing a
jaroslav@1890
   155
     * node's link to now point to the node itself. This doesn't arise
jaroslav@1890
   156
     * much for Stack nodes (because blocked threads do not hang on to
jaroslav@1890
   157
     * old head pointers), but references in Queue nodes must be
jaroslav@1890
   158
     * aggressively forgotten to avoid reachability of everything any
jaroslav@1890
   159
     * node has ever referred to since arrival.
jaroslav@1890
   160
     */
jaroslav@1890
   161
jaroslav@1890
   162
    /**
jaroslav@1890
   163
     * Shared internal API for dual stacks and queues.
jaroslav@1890
   164
     */
jaroslav@1890
   165
    abstract static class Transferer {
jaroslav@1890
   166
        /**
jaroslav@1890
   167
         * Performs a put or take.
jaroslav@1890
   168
         *
jaroslav@1890
   169
         * @param e if non-null, the item to be handed to a consumer;
jaroslav@1890
   170
         *          if null, requests that transfer return an item
jaroslav@1890
   171
         *          offered by producer.
jaroslav@1890
   172
         * @param timed if this operation should timeout
jaroslav@1890
   173
         * @param nanos the timeout, in nanoseconds
jaroslav@1890
   174
         * @return if non-null, the item provided or received; if null,
jaroslav@1890
   175
         *         the operation failed due to timeout or interrupt --
jaroslav@1890
   176
         *         the caller can distinguish which of these occurred
jaroslav@1890
   177
         *         by checking Thread.interrupted.
jaroslav@1890
   178
         */
jaroslav@1890
   179
        abstract Object transfer(Object e, boolean timed, long nanos);
jaroslav@1890
   180
    }
jaroslav@1890
   181
jaroslav@1890
   182
    /** The number of CPUs, for spin control */
jaroslav@1895
   183
    static final int NCPUS = 1;
jaroslav@1890
   184
jaroslav@1890
   185
    /**
jaroslav@1890
   186
     * The number of times to spin before blocking in timed waits.
jaroslav@1890
   187
     * The value is empirically derived -- it works well across a
jaroslav@1890
   188
     * variety of processors and OSes. Empirically, the best value
jaroslav@1890
   189
     * seems not to vary with number of CPUs (beyond 2) so is just
jaroslav@1890
   190
     * a constant.
jaroslav@1890
   191
     */
jaroslav@1890
   192
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
jaroslav@1890
   193
jaroslav@1890
   194
    /**
jaroslav@1890
   195
     * The number of times to spin before blocking in untimed waits.
jaroslav@1890
   196
     * This is greater than timed value because untimed waits spin
jaroslav@1890
   197
     * faster since they don't need to check times on each spin.
jaroslav@1890
   198
     */
jaroslav@1890
   199
    static final int maxUntimedSpins = maxTimedSpins * 16;
jaroslav@1890
   200
jaroslav@1890
   201
    /**
jaroslav@1890
   202
     * The number of nanoseconds for which it is faster to spin
jaroslav@1890
   203
     * rather than to use timed park. A rough estimate suffices.
jaroslav@1890
   204
     */
jaroslav@1890
   205
    static final long spinForTimeoutThreshold = 1000L;
jaroslav@1890
   206
jaroslav@1890
   207
    /** Dual stack */
jaroslav@1890
   208
    static final class TransferStack extends Transferer {
jaroslav@1890
   209
        /*
jaroslav@1890
   210
         * This extends Scherer-Scott dual stack algorithm, differing,
jaroslav@1890
   211
         * among other ways, by using "covering" nodes rather than
jaroslav@1890
   212
         * bit-marked pointers: Fulfilling operations push on marker
jaroslav@1890
   213
         * nodes (with FULFILLING bit set in mode) to reserve a spot
jaroslav@1890
   214
         * to match a waiting node.
jaroslav@1890
   215
         */
jaroslav@1890
   216
jaroslav@1890
   217
        /* Modes for SNodes, ORed together in node fields */
jaroslav@1890
   218
        /** Node represents an unfulfilled consumer */
jaroslav@1890
   219
        static final int REQUEST    = 0;
jaroslav@1890
   220
        /** Node represents an unfulfilled producer */
jaroslav@1890
   221
        static final int DATA       = 1;
jaroslav@1890
   222
        /** Node is fulfilling another unfulfilled DATA or REQUEST */
jaroslav@1890
   223
        static final int FULFILLING = 2;
jaroslav@1890
   224
jaroslav@1890
   225
        /** Return true if m has fulfilling bit set */
jaroslav@1890
   226
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
jaroslav@1890
   227
jaroslav@1890
   228
        /** Node class for TransferStacks. */
jaroslav@1890
   229
        static final class SNode {
jaroslav@1890
   230
            volatile SNode next;        // next node in stack
jaroslav@1890
   231
            volatile SNode match;       // the node matched to this
jaroslav@1890
   232
            volatile Thread waiter;     // to control park/unpark
jaroslav@1890
   233
            Object item;                // data; or null for REQUESTs
jaroslav@1890
   234
            int mode;
jaroslav@1890
   235
            // Note: item and mode fields don't need to be volatile
jaroslav@1890
   236
            // since they are always written before, and read after,
jaroslav@1890
   237
            // other volatile/atomic operations.
jaroslav@1890
   238
jaroslav@1890
   239
            SNode(Object item) {
jaroslav@1890
   240
                this.item = item;
jaroslav@1890
   241
            }
jaroslav@1890
   242
jaroslav@1890
   243
            boolean casNext(SNode cmp, SNode val) {
jaroslav@1895
   244
                if (next == cmp) {
jaroslav@1895
   245
                    next = val;
jaroslav@1895
   246
                    return true;
jaroslav@1895
   247
                }
jaroslav@1895
   248
                return false;
jaroslav@1890
   249
            }
jaroslav@1890
   250
jaroslav@1890
   251
            /**
jaroslav@1890
   252
             * Tries to match node s to this node, if so, waking up thread.
jaroslav@1890
   253
             * Fulfillers call tryMatch to identify their waiters.
jaroslav@1890
   254
             * Waiters block until they have been matched.
jaroslav@1890
   255
             *
jaroslav@1890
   256
             * @param s the node to match
jaroslav@1890
   257
             * @return true if successfully matched to s
jaroslav@1890
   258
             */
jaroslav@1890
   259
            boolean tryMatch(SNode s) {
jaroslav@1895
   260
                if (match == null) {
jaroslav@1895
   261
                    match = s;
jaroslav@1890
   262
                    Thread w = waiter;
jaroslav@1890
   263
                    if (w != null) {    // waiters need at most one unpark
jaroslav@1890
   264
                        waiter = null;
jaroslav@1890
   265
                        LockSupport.unpark(w);
jaroslav@1890
   266
                    }
jaroslav@1890
   267
                    return true;
jaroslav@1890
   268
                }
jaroslav@1890
   269
                return match == s;
jaroslav@1890
   270
            }
jaroslav@1890
   271
jaroslav@1890
   272
            /**
jaroslav@1890
   273
             * Tries to cancel a wait by matching node to itself.
jaroslav@1890
   274
             */
jaroslav@1890
   275
            void tryCancel() {
jaroslav@1895
   276
                if (match == null) {
jaroslav@1895
   277
                    match = this;
jaroslav@1895
   278
                }
jaroslav@1890
   279
            }
jaroslav@1890
   280
jaroslav@1890
   281
            boolean isCancelled() {
jaroslav@1890
   282
                return match == this;
jaroslav@1890
   283
            }
jaroslav@1890
   284
        }
jaroslav@1890
   285
jaroslav@1890
   286
        /** The head (top) of the stack */
jaroslav@1890
   287
        volatile SNode head;
jaroslav@1890
   288
jaroslav@1890
   289
        boolean casHead(SNode h, SNode nh) {
jaroslav@1895
   290
            if (head == h) {
jaroslav@1895
   291
                head = nh;
jaroslav@1895
   292
                return true;
jaroslav@1895
   293
            }
jaroslav@1895
   294
            return false;
jaroslav@1890
   295
        }
jaroslav@1890
   296
jaroslav@1890
   297
        /**
jaroslav@1890
   298
         * Creates or resets fields of a node. Called only from transfer
jaroslav@1890
   299
         * where the node to push on stack is lazily created and
jaroslav@1890
   300
         * reused when possible to help reduce intervals between reads
jaroslav@1890
   301
         * and CASes of head and to avoid surges of garbage when CASes
jaroslav@1890
   302
         * to push nodes fail due to contention.
jaroslav@1890
   303
         */
jaroslav@1890
   304
        static SNode snode(SNode s, Object e, SNode next, int mode) {
jaroslav@1890
   305
            if (s == null) s = new SNode(e);
jaroslav@1890
   306
            s.mode = mode;
jaroslav@1890
   307
            s.next = next;
jaroslav@1890
   308
            return s;
jaroslav@1890
   309
        }
jaroslav@1890
   310
jaroslav@1890
   311
        /**
jaroslav@1890
   312
         * Puts or takes an item.
jaroslav@1890
   313
         */
jaroslav@1890
   314
        Object transfer(Object e, boolean timed, long nanos) {
jaroslav@1890
   315
            /*
jaroslav@1890
   316
             * Basic algorithm is to loop trying one of three actions:
jaroslav@1890
   317
             *
jaroslav@1890
   318
             * 1. If apparently empty or already containing nodes of same
jaroslav@1890
   319
             *    mode, try to push node on stack and wait for a match,
jaroslav@1890
   320
             *    returning it, or null if cancelled.
jaroslav@1890
   321
             *
jaroslav@1890
   322
             * 2. If apparently containing node of complementary mode,
jaroslav@1890
   323
             *    try to push a fulfilling node on to stack, match
jaroslav@1890
   324
             *    with corresponding waiting node, pop both from
jaroslav@1890
   325
             *    stack, and return matched item. The matching or
jaroslav@1890
   326
             *    unlinking might not actually be necessary because of
jaroslav@1890
   327
             *    other threads performing action 3:
jaroslav@1890
   328
             *
jaroslav@1890
   329
             * 3. If top of stack already holds another fulfilling node,
jaroslav@1890
   330
             *    help it out by doing its match and/or pop
jaroslav@1890
   331
             *    operations, and then continue. The code for helping
jaroslav@1890
   332
             *    is essentially the same as for fulfilling, except
jaroslav@1890
   333
             *    that it doesn't return the item.
jaroslav@1890
   334
             */
jaroslav@1890
   335
jaroslav@1890
   336
            SNode s = null; // constructed/reused as needed
jaroslav@1890
   337
            int mode = (e == null) ? REQUEST : DATA;
jaroslav@1890
   338
jaroslav@1890
   339
            for (;;) {
jaroslav@1890
   340
                SNode h = head;
jaroslav@1890
   341
                if (h == null || h.mode == mode) {  // empty or same-mode
jaroslav@1890
   342
                    if (timed && nanos <= 0) {      // can't wait
jaroslav@1890
   343
                        if (h != null && h.isCancelled())
jaroslav@1890
   344
                            casHead(h, h.next);     // pop cancelled node
jaroslav@1890
   345
                        else
jaroslav@1890
   346
                            return null;
jaroslav@1890
   347
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
jaroslav@1890
   348
                        SNode m = awaitFulfill(s, timed, nanos);
jaroslav@1890
   349
                        if (m == s) {               // wait was cancelled
jaroslav@1890
   350
                            clean(s);
jaroslav@1890
   351
                            return null;
jaroslav@1890
   352
                        }
jaroslav@1890
   353
                        if ((h = head) != null && h.next == s)
jaroslav@1890
   354
                            casHead(h, s.next);     // help s's fulfiller
jaroslav@1890
   355
                        return (mode == REQUEST) ? m.item : s.item;
jaroslav@1890
   356
                    }
jaroslav@1890
   357
                } else if (!isFulfilling(h.mode)) { // try to fulfill
jaroslav@1890
   358
                    if (h.isCancelled())            // already cancelled
jaroslav@1890
   359
                        casHead(h, h.next);         // pop and retry
jaroslav@1890
   360
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
jaroslav@1890
   361
                        for (;;) { // loop until matched or waiters disappear
jaroslav@1890
   362
                            SNode m = s.next;       // m is s's match
jaroslav@1890
   363
                            if (m == null) {        // all waiters are gone
jaroslav@1890
   364
                                casHead(s, null);   // pop fulfill node
jaroslav@1890
   365
                                s = null;           // use new node next time
jaroslav@1890
   366
                                break;              // restart main loop
jaroslav@1890
   367
                            }
jaroslav@1890
   368
                            SNode mn = m.next;
jaroslav@1890
   369
                            if (m.tryMatch(s)) {
jaroslav@1890
   370
                                casHead(s, mn);     // pop both s and m
jaroslav@1890
   371
                                return (mode == REQUEST) ? m.item : s.item;
jaroslav@1890
   372
                            } else                  // lost match
jaroslav@1890
   373
                                s.casNext(m, mn);   // help unlink
jaroslav@1890
   374
                        }
jaroslav@1890
   375
                    }
jaroslav@1890
   376
                } else {                            // help a fulfiller
jaroslav@1890
   377
                    SNode m = h.next;               // m is h's match
jaroslav@1890
   378
                    if (m == null)                  // waiter is gone
jaroslav@1890
   379
                        casHead(h, null);           // pop fulfilling node
jaroslav@1890
   380
                    else {
jaroslav@1890
   381
                        SNode mn = m.next;
jaroslav@1890
   382
                        if (m.tryMatch(h))          // help match
jaroslav@1890
   383
                            casHead(h, mn);         // pop both h and m
jaroslav@1890
   384
                        else                        // lost match
jaroslav@1890
   385
                            h.casNext(m, mn);       // help unlink
jaroslav@1890
   386
                    }
jaroslav@1890
   387
                }
jaroslav@1890
   388
            }
jaroslav@1890
   389
        }
jaroslav@1890
   390
jaroslav@1890
   391
        /**
jaroslav@1890
   392
         * Spins/blocks until node s is matched by a fulfill operation.
jaroslav@1890
   393
         *
jaroslav@1890
   394
         * @param s the waiting node
jaroslav@1890
   395
         * @param timed true if timed wait
jaroslav@1890
   396
         * @param nanos timeout value
jaroslav@1890
   397
         * @return matched node, or s if cancelled
jaroslav@1890
   398
         */
jaroslav@1890
   399
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
jaroslav@1890
   400
            /*
jaroslav@1890
   401
             * When a node/thread is about to block, it sets its waiter
jaroslav@1890
   402
             * field and then rechecks state at least one more time
jaroslav@1890
   403
             * before actually parking, thus covering race vs
jaroslav@1890
   404
             * fulfiller noticing that waiter is non-null so should be
jaroslav@1890
   405
             * woken.
jaroslav@1890
   406
             *
jaroslav@1890
   407
             * When invoked by nodes that appear at the point of call
jaroslav@1890
   408
             * to be at the head of the stack, calls to park are
jaroslav@1890
   409
             * preceded by spins to avoid blocking when producers and
jaroslav@1890
   410
             * consumers are arriving very close in time.  This can
jaroslav@1890
   411
             * happen enough to bother only on multiprocessors.
jaroslav@1890
   412
             *
jaroslav@1890
   413
             * The order of checks for returning out of main loop
jaroslav@1890
   414
             * reflects fact that interrupts have precedence over
jaroslav@1890
   415
             * normal returns, which have precedence over
jaroslav@1890
   416
             * timeouts. (So, on timeout, one last check for match is
jaroslav@1890
   417
             * done before giving up.) Except that calls from untimed
jaroslav@1890
   418
             * SynchronousQueue.{poll/offer} don't check interrupts
jaroslav@1890
   419
             * and don't wait at all, so are trapped in transfer
jaroslav@1890
   420
             * method rather than calling awaitFulfill.
jaroslav@1890
   421
             */
jaroslav@1890
   422
            long lastTime = timed ? System.nanoTime() : 0;
jaroslav@1890
   423
            Thread w = Thread.currentThread();
jaroslav@1890
   424
            SNode h = head;
jaroslav@1890
   425
            int spins = (shouldSpin(s) ?
jaroslav@1890
   426
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
jaroslav@1890
   427
            for (;;) {
jaroslav@1890
   428
                if (w.isInterrupted())
jaroslav@1890
   429
                    s.tryCancel();
jaroslav@1890
   430
                SNode m = s.match;
jaroslav@1890
   431
                if (m != null)
jaroslav@1890
   432
                    return m;
jaroslav@1890
   433
                if (timed) {
jaroslav@1890
   434
                    long now = System.nanoTime();
jaroslav@1890
   435
                    nanos -= now - lastTime;
jaroslav@1890
   436
                    lastTime = now;
jaroslav@1890
   437
                    if (nanos <= 0) {
jaroslav@1890
   438
                        s.tryCancel();
jaroslav@1890
   439
                        continue;
jaroslav@1890
   440
                    }
jaroslav@1890
   441
                }
jaroslav@1890
   442
                if (spins > 0)
jaroslav@1890
   443
                    spins = shouldSpin(s) ? (spins-1) : 0;
jaroslav@1890
   444
                else if (s.waiter == null)
jaroslav@1890
   445
                    s.waiter = w; // establish waiter so can park next iter
jaroslav@1890
   446
                else if (!timed)
jaroslav@1890
   447
                    LockSupport.park(this);
jaroslav@1890
   448
                else if (nanos > spinForTimeoutThreshold)
jaroslav@1890
   449
                    LockSupport.parkNanos(this, nanos);
jaroslav@1890
   450
            }
jaroslav@1890
   451
        }
jaroslav@1890
   452
jaroslav@1890
   453
        /**
jaroslav@1890
   454
         * Returns true if node s is at head or there is an active
jaroslav@1890
   455
         * fulfiller.
jaroslav@1890
   456
         */
jaroslav@1890
   457
        boolean shouldSpin(SNode s) {
jaroslav@1890
   458
            SNode h = head;
jaroslav@1890
   459
            return (h == s || h == null || isFulfilling(h.mode));
jaroslav@1890
   460
        }
jaroslav@1890
   461
jaroslav@1890
   462
        /**
jaroslav@1890
   463
         * Unlinks s from the stack.
jaroslav@1890
   464
         */
jaroslav@1890
   465
        void clean(SNode s) {
jaroslav@1890
   466
            s.item = null;   // forget item
jaroslav@1890
   467
            s.waiter = null; // forget thread
jaroslav@1890
   468
jaroslav@1890
   469
            /*
jaroslav@1890
   470
             * At worst we may need to traverse entire stack to unlink
jaroslav@1890
   471
             * s. If there are multiple concurrent calls to clean, we
jaroslav@1890
   472
             * might not see s if another thread has already removed
jaroslav@1890
   473
             * it. But we can stop when we see any node known to
jaroslav@1890
   474
             * follow s. We use s.next unless it too is cancelled, in
jaroslav@1890
   475
             * which case we try the node one past. We don't check any
jaroslav@1890
   476
             * further because we don't want to doubly traverse just to
jaroslav@1890
   477
             * find sentinel.
jaroslav@1890
   478
             */
jaroslav@1890
   479
jaroslav@1890
   480
            SNode past = s.next;
jaroslav@1890
   481
            if (past != null && past.isCancelled())
jaroslav@1890
   482
                past = past.next;
jaroslav@1890
   483
jaroslav@1890
   484
            // Absorb cancelled nodes at head
jaroslav@1890
   485
            SNode p;
jaroslav@1890
   486
            while ((p = head) != null && p != past && p.isCancelled())
jaroslav@1890
   487
                casHead(p, p.next);
jaroslav@1890
   488
jaroslav@1890
   489
            // Unsplice embedded nodes
jaroslav@1890
   490
            while (p != null && p != past) {
jaroslav@1890
   491
                SNode n = p.next;
jaroslav@1890
   492
                if (n != null && n.isCancelled())
jaroslav@1890
   493
                    p.casNext(n, n.next);
jaroslav@1890
   494
                else
jaroslav@1890
   495
                    p = n;
jaroslav@1890
   496
            }
jaroslav@1890
   497
        }
jaroslav@1890
   498
    }
jaroslav@1890
   499
jaroslav@1890
   500
    /** Dual Queue */
jaroslav@1890
   501
    static final class TransferQueue extends Transferer {
jaroslav@1890
   502
        /*
jaroslav@1890
   503
         * This extends Scherer-Scott dual queue algorithm, differing,
jaroslav@1890
   504
         * among other ways, by using modes within nodes rather than
jaroslav@1890
   505
         * marked pointers. The algorithm is a little simpler than
jaroslav@1890
   506
         * that for stacks because fulfillers do not need explicit
jaroslav@1890
   507
         * nodes, and matching is done by CAS'ing QNode.item field
jaroslav@1890
   508
         * from non-null to null (for put) or vice versa (for take).
jaroslav@1890
   509
         */
jaroslav@1890
   510
jaroslav@1890
   511
        /** Node class for TransferQueue. */
jaroslav@1890
   512
        static final class QNode {
jaroslav@1890
   513
            volatile QNode next;          // next node in queue
jaroslav@1890
   514
            volatile Object item;         // CAS'ed to or from null
jaroslav@1890
   515
            volatile Thread waiter;       // to control park/unpark
jaroslav@1890
   516
            final boolean isData;
jaroslav@1890
   517
jaroslav@1890
   518
            QNode(Object item, boolean isData) {
jaroslav@1890
   519
                this.item = item;
jaroslav@1890
   520
                this.isData = isData;
jaroslav@1890
   521
            }
jaroslav@1890
   522
jaroslav@1890
   523
            boolean casNext(QNode cmp, QNode val) {
jaroslav@1895
   524
                if (next == cmp) {
jaroslav@1895
   525
                    next = val;
jaroslav@1895
   526
                    return true;
jaroslav@1895
   527
                }
jaroslav@1895
   528
                return false;
jaroslav@1890
   529
            }
jaroslav@1890
   530
jaroslav@1890
   531
            boolean casItem(Object cmp, Object val) {
jaroslav@1895
   532
                if (item == cmp) {
jaroslav@1895
   533
                    item = val;
jaroslav@1895
   534
                    return true;
jaroslav@1895
   535
                }
jaroslav@1895
   536
                return false;
jaroslav@1890
   537
            }
jaroslav@1890
   538
jaroslav@1890
   539
            /**
jaroslav@1890
   540
             * Tries to cancel by CAS'ing ref to this as item.
jaroslav@1890
   541
             */
jaroslav@1890
   542
            void tryCancel(Object cmp) {
jaroslav@1895
   543
                if (item == cmp) {
jaroslav@1895
   544
                    item = this;
jaroslav@1895
   545
                }
jaroslav@1890
   546
            }
jaroslav@1890
   547
jaroslav@1890
   548
            boolean isCancelled() {
jaroslav@1890
   549
                return item == this;
jaroslav@1890
   550
            }
jaroslav@1890
   551
jaroslav@1890
   552
            /**
jaroslav@1890
   553
             * Returns true if this node is known to be off the queue
jaroslav@1890
   554
             * because its next pointer has been forgotten due to
jaroslav@1890
   555
             * an advanceHead operation.
jaroslav@1890
   556
             */
jaroslav@1890
   557
            boolean isOffList() {
jaroslav@1890
   558
                return next == this;
jaroslav@1890
   559
            }
jaroslav@1890
   560
        }
jaroslav@1890
   561
jaroslav@1890
   562
        /** Head of queue */
jaroslav@1890
   563
        transient volatile QNode head;
jaroslav@1890
   564
        /** Tail of queue */
jaroslav@1890
   565
        transient volatile QNode tail;
jaroslav@1890
   566
        /**
jaroslav@1890
   567
         * Reference to a cancelled node that might not yet have been
jaroslav@1890
   568
         * unlinked from queue because it was the last inserted node
jaroslav@1890
   569
         * when it cancelled.
jaroslav@1890
   570
         */
jaroslav@1890
   571
        transient volatile QNode cleanMe;
jaroslav@1890
   572
jaroslav@1890
   573
        TransferQueue() {
jaroslav@1890
   574
            QNode h = new QNode(null, false); // initialize to dummy node.
jaroslav@1890
   575
            head = h;
jaroslav@1890
   576
            tail = h;
jaroslav@1890
   577
        }
jaroslav@1890
   578
jaroslav@1890
   579
        /**
jaroslav@1890
   580
         * Tries to cas nh as new head; if successful, unlink
jaroslav@1890
   581
         * old head's next node to avoid garbage retention.
jaroslav@1890
   582
         */
jaroslav@1890
   583
        void advanceHead(QNode h, QNode nh) {
jaroslav@1895
   584
            if (head == h) {
jaroslav@1895
   585
                head = nh;
jaroslav@1890
   586
                h.next = h; // forget old next
jaroslav@1895
   587
            }
jaroslav@1890
   588
        }
jaroslav@1890
   589
jaroslav@1890
   590
        /**
jaroslav@1890
   591
         * Tries to cas nt as new tail.
jaroslav@1890
   592
         */
jaroslav@1890
   593
        void advanceTail(QNode t, QNode nt) {
jaroslav@1895
   594
            if (tail == t) {
jaroslav@1895
   595
                tail = nt;
jaroslav@1895
   596
            }
jaroslav@1890
   597
        }
jaroslav@1890
   598
jaroslav@1890
   599
        /**
jaroslav@1890
   600
         * Tries to CAS cleanMe slot.
jaroslav@1890
   601
         */
jaroslav@1890
   602
        boolean casCleanMe(QNode cmp, QNode val) {
jaroslav@1895
   603
            if (cleanMe == cmp) {
jaroslav@1895
   604
                cleanMe = val;
jaroslav@1895
   605
                return true;
jaroslav@1895
   606
            }
jaroslav@1895
   607
            return false;
jaroslav@1890
   608
        }
jaroslav@1890
   609
jaroslav@1890
   610
        /**
jaroslav@1890
   611
         * Puts or takes an item.
jaroslav@1890
   612
         */
jaroslav@1890
   613
        Object transfer(Object e, boolean timed, long nanos) {
jaroslav@1890
   614
            /* Basic algorithm is to loop trying to take either of
jaroslav@1890
   615
             * two actions:
jaroslav@1890
   616
             *
jaroslav@1890
   617
             * 1. If queue apparently empty or holding same-mode nodes,
jaroslav@1890
   618
             *    try to add node to queue of waiters, wait to be
jaroslav@1890
   619
             *    fulfilled (or cancelled) and return matching item.
jaroslav@1890
   620
             *
jaroslav@1890
   621
             * 2. If queue apparently contains waiting items, and this
jaroslav@1890
   622
             *    call is of complementary mode, try to fulfill by CAS'ing
jaroslav@1890
   623
             *    item field of waiting node and dequeuing it, and then
jaroslav@1890
   624
             *    returning matching item.
jaroslav@1890
   625
             *
jaroslav@1890
   626
             * In each case, along the way, check for and try to help
jaroslav@1890
   627
             * advance head and tail on behalf of other stalled/slow
jaroslav@1890
   628
             * threads.
jaroslav@1890
   629
             *
jaroslav@1890
   630
             * The loop starts off with a null check guarding against
jaroslav@1890
   631
             * seeing uninitialized head or tail values. This never
jaroslav@1890
   632
             * happens in current SynchronousQueue, but could if
jaroslav@1890
   633
             * callers held non-volatile/final ref to the
jaroslav@1890
   634
             * transferer. The check is here anyway because it places
jaroslav@1890
   635
             * null checks at top of loop, which is usually faster
jaroslav@1890
   636
             * than having them implicitly interspersed.
jaroslav@1890
   637
             */
jaroslav@1890
   638
jaroslav@1890
   639
            QNode s = null; // constructed/reused as needed
jaroslav@1890
   640
            boolean isData = (e != null);
jaroslav@1890
   641
jaroslav@1890
   642
            for (;;) {
jaroslav@1890
   643
                QNode t = tail;
jaroslav@1890
   644
                QNode h = head;
jaroslav@1890
   645
                if (t == null || h == null)         // saw uninitialized value
jaroslav@1890
   646
                    continue;                       // spin
jaroslav@1890
   647
jaroslav@1890
   648
                if (h == t || t.isData == isData) { // empty or same-mode
jaroslav@1890
   649
                    QNode tn = t.next;
jaroslav@1890
   650
                    if (t != tail)                  // inconsistent read
jaroslav@1890
   651
                        continue;
jaroslav@1890
   652
                    if (tn != null) {               // lagging tail
jaroslav@1890
   653
                        advanceTail(t, tn);
jaroslav@1890
   654
                        continue;
jaroslav@1890
   655
                    }
jaroslav@1890
   656
                    if (timed && nanos <= 0)        // can't wait
jaroslav@1890
   657
                        return null;
jaroslav@1890
   658
                    if (s == null)
jaroslav@1890
   659
                        s = new QNode(e, isData);
jaroslav@1890
   660
                    if (!t.casNext(null, s))        // failed to link in
jaroslav@1890
   661
                        continue;
jaroslav@1890
   662
jaroslav@1890
   663
                    advanceTail(t, s);              // swing tail and wait
jaroslav@1890
   664
                    Object x = awaitFulfill(s, e, timed, nanos);
jaroslav@1890
   665
                    if (x == s) {                   // wait was cancelled
jaroslav@1890
   666
                        clean(t, s);
jaroslav@1890
   667
                        return null;
jaroslav@1890
   668
                    }
jaroslav@1890
   669
jaroslav@1890
   670
                    if (!s.isOffList()) {           // not already unlinked
jaroslav@1890
   671
                        advanceHead(t, s);          // unlink if head
jaroslav@1890
   672
                        if (x != null)              // and forget fields
jaroslav@1890
   673
                            s.item = s;
jaroslav@1890
   674
                        s.waiter = null;
jaroslav@1890
   675
                    }
jaroslav@1890
   676
                    return (x != null) ? x : e;
jaroslav@1890
   677
jaroslav@1890
   678
                } else {                            // complementary-mode
jaroslav@1890
   679
                    QNode m = h.next;               // node to fulfill
jaroslav@1890
   680
                    if (t != tail || m == null || h != head)
jaroslav@1890
   681
                        continue;                   // inconsistent read
jaroslav@1890
   682
jaroslav@1890
   683
                    Object x = m.item;
jaroslav@1890
   684
                    if (isData == (x != null) ||    // m already fulfilled
jaroslav@1890
   685
                        x == m ||                   // m cancelled
jaroslav@1890
   686
                        !m.casItem(x, e)) {         // lost CAS
jaroslav@1890
   687
                        advanceHead(h, m);          // dequeue and retry
jaroslav@1890
   688
                        continue;
jaroslav@1890
   689
                    }
jaroslav@1890
   690
jaroslav@1890
   691
                    advanceHead(h, m);              // successfully fulfilled
jaroslav@1890
   692
                    LockSupport.unpark(m.waiter);
jaroslav@1890
   693
                    return (x != null) ? x : e;
jaroslav@1890
   694
                }
jaroslav@1890
   695
            }
jaroslav@1890
   696
        }
jaroslav@1890
   697
jaroslav@1890
   698
        /**
jaroslav@1890
   699
         * Spins/blocks until node s is fulfilled.
jaroslav@1890
   700
         *
jaroslav@1890
   701
         * @param s the waiting node
jaroslav@1890
   702
         * @param e the comparison value for checking match
jaroslav@1890
   703
         * @param timed true if timed wait
jaroslav@1890
   704
         * @param nanos timeout value
jaroslav@1890
   705
         * @return matched item, or s if cancelled
jaroslav@1890
   706
         */
jaroslav@1890
   707
        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
jaroslav@1890
   708
            /* Same idea as TransferStack.awaitFulfill */
jaroslav@1890
   709
            long lastTime = timed ? System.nanoTime() : 0;
jaroslav@1890
   710
            Thread w = Thread.currentThread();
jaroslav@1890
   711
            int spins = ((head.next == s) ?
jaroslav@1890
   712
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
jaroslav@1890
   713
            for (;;) {
jaroslav@1890
   714
                if (w.isInterrupted())
jaroslav@1890
   715
                    s.tryCancel(e);
jaroslav@1890
   716
                Object x = s.item;
jaroslav@1890
   717
                if (x != e)
jaroslav@1890
   718
                    return x;
jaroslav@1890
   719
                if (timed) {
jaroslav@1890
   720
                    long now = System.nanoTime();
jaroslav@1890
   721
                    nanos -= now - lastTime;
jaroslav@1890
   722
                    lastTime = now;
jaroslav@1890
   723
                    if (nanos <= 0) {
jaroslav@1890
   724
                        s.tryCancel(e);
jaroslav@1890
   725
                        continue;
jaroslav@1890
   726
                    }
jaroslav@1890
   727
                }
jaroslav@1890
   728
                if (spins > 0)
jaroslav@1890
   729
                    --spins;
jaroslav@1890
   730
                else if (s.waiter == null)
jaroslav@1890
   731
                    s.waiter = w;
jaroslav@1890
   732
                else if (!timed)
jaroslav@1890
   733
                    LockSupport.park(this);
jaroslav@1890
   734
                else if (nanos > spinForTimeoutThreshold)
jaroslav@1890
   735
                    LockSupport.parkNanos(this, nanos);
jaroslav@1890
   736
            }
jaroslav@1890
   737
        }
jaroslav@1890
   738
jaroslav@1890
   739
        /**
jaroslav@1890
   740
         * Gets rid of cancelled node s with original predecessor pred.
jaroslav@1890
   741
         */
jaroslav@1890
   742
        void clean(QNode pred, QNode s) {
jaroslav@1890
   743
            s.waiter = null; // forget thread
jaroslav@1890
   744
            /*
jaroslav@1890
   745
             * At any given time, exactly one node on list cannot be
jaroslav@1890
   746
             * deleted -- the last inserted node. To accommodate this,
jaroslav@1890
   747
             * if we cannot delete s, we save its predecessor as
jaroslav@1890
   748
             * "cleanMe", deleting the previously saved version
jaroslav@1890
   749
             * first. At least one of node s or the node previously
jaroslav@1890
   750
             * saved can always be deleted, so this always terminates.
jaroslav@1890
   751
             */
jaroslav@1890
   752
            while (pred.next == s) { // Return early if already unlinked
jaroslav@1890
   753
                QNode h = head;
jaroslav@1890
   754
                QNode hn = h.next;   // Absorb cancelled first node as head
jaroslav@1890
   755
                if (hn != null && hn.isCancelled()) {
jaroslav@1890
   756
                    advanceHead(h, hn);
jaroslav@1890
   757
                    continue;
jaroslav@1890
   758
                }
jaroslav@1890
   759
                QNode t = tail;      // Ensure consistent read for tail
jaroslav@1890
   760
                if (t == h)
jaroslav@1890
   761
                    return;
jaroslav@1890
   762
                QNode tn = t.next;
jaroslav@1890
   763
                if (t != tail)
jaroslav@1890
   764
                    continue;
jaroslav@1890
   765
                if (tn != null) {
jaroslav@1890
   766
                    advanceTail(t, tn);
jaroslav@1890
   767
                    continue;
jaroslav@1890
   768
                }
jaroslav@1890
   769
                if (s != t) {        // If not tail, try to unsplice
jaroslav@1890
   770
                    QNode sn = s.next;
jaroslav@1890
   771
                    if (sn == s || pred.casNext(s, sn))
jaroslav@1890
   772
                        return;
jaroslav@1890
   773
                }
jaroslav@1890
   774
                QNode dp = cleanMe;
jaroslav@1890
   775
                if (dp != null) {    // Try unlinking previous cancelled node
jaroslav@1890
   776
                    QNode d = dp.next;
jaroslav@1890
   777
                    QNode dn;
jaroslav@1890
   778
                    if (d == null ||               // d is gone or
jaroslav@1890
   779
                        d == dp ||                 // d is off list or
jaroslav@1890
   780
                        !d.isCancelled() ||        // d not cancelled or
jaroslav@1890
   781
                        (d != t &&                 // d not tail and
jaroslav@1890
   782
                         (dn = d.next) != null &&  //   has successor
jaroslav@1890
   783
                         dn != d &&                //   that is on list
jaroslav@1890
   784
                         dp.casNext(d, dn)))       // d unspliced
jaroslav@1890
   785
                        casCleanMe(dp, null);
jaroslav@1890
   786
                    if (dp == pred)
jaroslav@1890
   787
                        return;      // s is already saved node
jaroslav@1890
   788
                } else if (casCleanMe(null, pred))
jaroslav@1890
   789
                    return;          // Postpone cleaning s
jaroslav@1890
   790
            }
jaroslav@1890
   791
        }
jaroslav@1890
   792
    }
jaroslav@1890
   793
jaroslav@1890
   794
    /**
jaroslav@1890
   795
     * The transferer. Set only in constructor, but cannot be declared
jaroslav@1890
   796
     * as final without further complicating serialization.  Since
jaroslav@1890
   797
     * this is accessed only at most once per public method, there
jaroslav@1890
   798
     * isn't a noticeable performance penalty for using volatile
jaroslav@1890
   799
     * instead of final here.
jaroslav@1890
   800
     */
jaroslav@1890
   801
    private transient volatile Transferer transferer;
jaroslav@1890
   802
jaroslav@1890
   803
    /**
jaroslav@1890
   804
     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
jaroslav@1890
   805
     */
jaroslav@1890
   806
    public SynchronousQueue() {
jaroslav@1890
   807
        this(false);
jaroslav@1890
   808
    }
jaroslav@1890
   809
jaroslav@1890
   810
    /**
jaroslav@1890
   811
     * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
jaroslav@1890
   812
     *
jaroslav@1890
   813
     * @param fair if true, waiting threads contend in FIFO order for
jaroslav@1890
   814
     *        access; otherwise the order is unspecified.
jaroslav@1890
   815
     */
jaroslav@1890
   816
    public SynchronousQueue(boolean fair) {
jaroslav@1890
   817
        transferer = fair ? new TransferQueue() : new TransferStack();
jaroslav@1890
   818
    }
jaroslav@1890
   819
jaroslav@1890
   820
    /**
jaroslav@1890
   821
     * Adds the specified element to this queue, waiting if necessary for
jaroslav@1890
   822
     * another thread to receive it.
jaroslav@1890
   823
     *
jaroslav@1890
   824
     * @throws InterruptedException {@inheritDoc}
jaroslav@1890
   825
     * @throws NullPointerException {@inheritDoc}
jaroslav@1890
   826
     */
jaroslav@1890
   827
    public void put(E o) throws InterruptedException {
jaroslav@1890
   828
        if (o == null) throw new NullPointerException();
jaroslav@1890
   829
        if (transferer.transfer(o, false, 0) == null) {
jaroslav@1890
   830
            Thread.interrupted();
jaroslav@1890
   831
            throw new InterruptedException();
jaroslav@1890
   832
        }
jaroslav@1890
   833
    }
jaroslav@1890
   834
jaroslav@1890
   835
    /**
jaroslav@1890
   836
     * Inserts the specified element into this queue, waiting if necessary
jaroslav@1890
   837
     * up to the specified wait time for another thread to receive it.
jaroslav@1890
   838
     *
jaroslav@1890
   839
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
jaroslav@1890
   840
     *         specified waiting time elapses before a consumer appears.
jaroslav@1890
   841
     * @throws InterruptedException {@inheritDoc}
jaroslav@1890
   842
     * @throws NullPointerException {@inheritDoc}
jaroslav@1890
   843
     */
jaroslav@1890
   844
    public boolean offer(E o, long timeout, TimeUnit unit)
jaroslav@1890
   845
        throws InterruptedException {
jaroslav@1890
   846
        if (o == null) throw new NullPointerException();
jaroslav@1890
   847
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
jaroslav@1890
   848
            return true;
jaroslav@1890
   849
        if (!Thread.interrupted())
jaroslav@1890
   850
            return false;
jaroslav@1890
   851
        throw new InterruptedException();
jaroslav@1890
   852
    }
jaroslav@1890
   853
jaroslav@1890
   854
    /**
jaroslav@1890
   855
     * Inserts the specified element into this queue, if another thread is
jaroslav@1890
   856
     * waiting to receive it.
jaroslav@1890
   857
     *
jaroslav@1890
   858
     * @param e the element to add
jaroslav@1890
   859
     * @return <tt>true</tt> if the element was added to this queue, else
jaroslav@1890
   860
     *         <tt>false</tt>
jaroslav@1890
   861
     * @throws NullPointerException if the specified element is null
jaroslav@1890
   862
     */
jaroslav@1890
   863
    public boolean offer(E e) {
jaroslav@1890
   864
        if (e == null) throw new NullPointerException();
jaroslav@1890
   865
        return transferer.transfer(e, true, 0) != null;
jaroslav@1890
   866
    }
jaroslav@1890
   867
jaroslav@1890
   868
    /**
jaroslav@1890
   869
     * Retrieves and removes the head of this queue, waiting if necessary
jaroslav@1890
   870
     * for another thread to insert it.
jaroslav@1890
   871
     *
jaroslav@1890
   872
     * @return the head of this queue
jaroslav@1890
   873
     * @throws InterruptedException {@inheritDoc}
jaroslav@1890
   874
     */
jaroslav@1890
   875
    public E take() throws InterruptedException {
jaroslav@1890
   876
        Object e = transferer.transfer(null, false, 0);
jaroslav@1890
   877
        if (e != null)
jaroslav@1890
   878
            return (E)e;
jaroslav@1890
   879
        Thread.interrupted();
jaroslav@1890
   880
        throw new InterruptedException();
jaroslav@1890
   881
    }
jaroslav@1890
   882
jaroslav@1890
   883
    /**
jaroslav@1890
   884
     * Retrieves and removes the head of this queue, waiting
jaroslav@1890
   885
     * if necessary up to the specified wait time, for another thread
jaroslav@1890
   886
     * to insert it.
jaroslav@1890
   887
     *
jaroslav@1890
   888
     * @return the head of this queue, or <tt>null</tt> if the
jaroslav@1890
   889
     *         specified waiting time elapses before an element is present.
jaroslav@1890
   890
     * @throws InterruptedException {@inheritDoc}
jaroslav@1890
   891
     */
jaroslav@1890
   892
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
jaroslav@1890
   893
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
jaroslav@1890
   894
        if (e != null || !Thread.interrupted())
jaroslav@1890
   895
            return (E)e;
jaroslav@1890
   896
        throw new InterruptedException();
jaroslav@1890
   897
    }
jaroslav@1890
   898
jaroslav@1890
   899
    /**
jaroslav@1890
   900
     * Retrieves and removes the head of this queue, if another thread
jaroslav@1890
   901
     * is currently making an element available.
jaroslav@1890
   902
     *
jaroslav@1890
   903
     * @return the head of this queue, or <tt>null</tt> if no
jaroslav@1890
   904
     *         element is available.
jaroslav@1890
   905
     */
jaroslav@1890
   906
    public E poll() {
jaroslav@1890
   907
        return (E)transferer.transfer(null, true, 0);
jaroslav@1890
   908
    }
jaroslav@1890
   909
jaroslav@1890
   910
    /**
jaroslav@1890
   911
     * Always returns <tt>true</tt>.
jaroslav@1890
   912
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   913
     *
jaroslav@1890
   914
     * @return <tt>true</tt>
jaroslav@1890
   915
     */
jaroslav@1890
   916
    public boolean isEmpty() {
jaroslav@1890
   917
        return true;
jaroslav@1890
   918
    }
jaroslav@1890
   919
jaroslav@1890
   920
    /**
jaroslav@1890
   921
     * Always returns zero.
jaroslav@1890
   922
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   923
     *
jaroslav@1890
   924
     * @return zero.
jaroslav@1890
   925
     */
jaroslav@1890
   926
    public int size() {
jaroslav@1890
   927
        return 0;
jaroslav@1890
   928
    }
jaroslav@1890
   929
jaroslav@1890
   930
    /**
jaroslav@1890
   931
     * Always returns zero.
jaroslav@1890
   932
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   933
     *
jaroslav@1890
   934
     * @return zero.
jaroslav@1890
   935
     */
jaroslav@1890
   936
    public int remainingCapacity() {
jaroslav@1890
   937
        return 0;
jaroslav@1890
   938
    }
jaroslav@1890
   939
jaroslav@1890
   940
    /**
jaroslav@1890
   941
     * Does nothing.
jaroslav@1890
   942
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   943
     */
jaroslav@1890
   944
    public void clear() {
jaroslav@1890
   945
    }
jaroslav@1890
   946
jaroslav@1890
   947
    /**
jaroslav@1890
   948
     * Always returns <tt>false</tt>.
jaroslav@1890
   949
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   950
     *
jaroslav@1890
   951
     * @param o the element
jaroslav@1890
   952
     * @return <tt>false</tt>
jaroslav@1890
   953
     */
jaroslav@1890
   954
    public boolean contains(Object o) {
jaroslav@1890
   955
        return false;
jaroslav@1890
   956
    }
jaroslav@1890
   957
jaroslav@1890
   958
    /**
jaroslav@1890
   959
     * Always returns <tt>false</tt>.
jaroslav@1890
   960
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   961
     *
jaroslav@1890
   962
     * @param o the element to remove
jaroslav@1890
   963
     * @return <tt>false</tt>
jaroslav@1890
   964
     */
jaroslav@1890
   965
    public boolean remove(Object o) {
jaroslav@1890
   966
        return false;
jaroslav@1890
   967
    }
jaroslav@1890
   968
jaroslav@1890
   969
    /**
jaroslav@1890
   970
     * Returns <tt>false</tt> unless the given collection is empty.
jaroslav@1890
   971
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   972
     *
jaroslav@1890
   973
     * @param c the collection
jaroslav@1890
   974
     * @return <tt>false</tt> unless given collection is empty
jaroslav@1890
   975
     */
jaroslav@1890
   976
    public boolean containsAll(Collection<?> c) {
jaroslav@1890
   977
        return c.isEmpty();
jaroslav@1890
   978
    }
jaroslav@1890
   979
jaroslav@1890
   980
    /**
jaroslav@1890
   981
     * Always returns <tt>false</tt>.
jaroslav@1890
   982
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   983
     *
jaroslav@1890
   984
     * @param c the collection
jaroslav@1890
   985
     * @return <tt>false</tt>
jaroslav@1890
   986
     */
jaroslav@1890
   987
    public boolean removeAll(Collection<?> c) {
jaroslav@1890
   988
        return false;
jaroslav@1890
   989
    }
jaroslav@1890
   990
jaroslav@1890
   991
    /**
jaroslav@1890
   992
     * Always returns <tt>false</tt>.
jaroslav@1890
   993
     * A <tt>SynchronousQueue</tt> has no internal capacity.
jaroslav@1890
   994
     *
jaroslav@1890
   995
     * @param c the collection
jaroslav@1890
   996
     * @return <tt>false</tt>
jaroslav@1890
   997
     */
jaroslav@1890
   998
    public boolean retainAll(Collection<?> c) {
jaroslav@1890
   999
        return false;
jaroslav@1890
  1000
    }
jaroslav@1890
  1001
jaroslav@1890
  1002
    /**
jaroslav@1890
  1003
     * Always returns <tt>null</tt>.
jaroslav@1890
  1004
     * A <tt>SynchronousQueue</tt> does not return elements
jaroslav@1890
  1005
     * unless actively waited on.
jaroslav@1890
  1006
     *
jaroslav@1890
  1007
     * @return <tt>null</tt>
jaroslav@1890
  1008
     */
jaroslav@1890
  1009
    public E peek() {
jaroslav@1890
  1010
        return null;
jaroslav@1890
  1011
    }
jaroslav@1890
  1012
jaroslav@1890
  1013
    /**
jaroslav@1890
  1014
     * Returns an empty iterator in which <tt>hasNext</tt> always returns
jaroslav@1890
  1015
     * <tt>false</tt>.
jaroslav@1890
  1016
     *
jaroslav@1890
  1017
     * @return an empty iterator
jaroslav@1890
  1018
     */
jaroslav@1890
  1019
    public Iterator<E> iterator() {
jaroslav@1890
  1020
        return Collections.emptyIterator();
jaroslav@1890
  1021
    }
jaroslav@1890
  1022
jaroslav@1890
  1023
    /**
jaroslav@1890
  1024
     * Returns a zero-length array.
jaroslav@1890
  1025
     * @return a zero-length array
jaroslav@1890
  1026
     */
jaroslav@1890
  1027
    public Object[] toArray() {
jaroslav@1890
  1028
        return new Object[0];
jaroslav@1890
  1029
    }
jaroslav@1890
  1030
jaroslav@1890
  1031
    /**
jaroslav@1890
  1032
     * Sets the zeroeth element of the specified array to <tt>null</tt>
jaroslav@1890
  1033
     * (if the array has non-zero length) and returns it.
jaroslav@1890
  1034
     *
jaroslav@1890
  1035
     * @param a the array
jaroslav@1890
  1036
     * @return the specified array
jaroslav@1890
  1037
     * @throws NullPointerException if the specified array is null
jaroslav@1890
  1038
     */
jaroslav@1890
  1039
    public <T> T[] toArray(T[] a) {
jaroslav@1890
  1040
        if (a.length > 0)
jaroslav@1890
  1041
            a[0] = null;
jaroslav@1890
  1042
        return a;
jaroslav@1890
  1043
    }
jaroslav@1890
  1044
jaroslav@1890
  1045
    /**
jaroslav@1890
  1046
     * @throws UnsupportedOperationException {@inheritDoc}
jaroslav@1890
  1047
     * @throws ClassCastException            {@inheritDoc}
jaroslav@1890
  1048
     * @throws NullPointerException          {@inheritDoc}
jaroslav@1890
  1049
     * @throws IllegalArgumentException      {@inheritDoc}
jaroslav@1890
  1050
     */
jaroslav@1890
  1051
    public int drainTo(Collection<? super E> c) {
jaroslav@1890
  1052
        if (c == null)
jaroslav@1890
  1053
            throw new NullPointerException();
jaroslav@1890
  1054
        if (c == this)
jaroslav@1890
  1055
            throw new IllegalArgumentException();
jaroslav@1890
  1056
        int n = 0;
jaroslav@1890
  1057
        E e;
jaroslav@1890
  1058
        while ( (e = poll()) != null) {
jaroslav@1890
  1059
            c.add(e);
jaroslav@1890
  1060
            ++n;
jaroslav@1890
  1061
        }
jaroslav@1890
  1062
        return n;
jaroslav@1890
  1063
    }
jaroslav@1890
  1064
jaroslav@1890
  1065
    /**
jaroslav@1890
  1066
     * @throws UnsupportedOperationException {@inheritDoc}
jaroslav@1890
  1067
     * @throws ClassCastException            {@inheritDoc}
jaroslav@1890
  1068
     * @throws NullPointerException          {@inheritDoc}
jaroslav@1890
  1069
     * @throws IllegalArgumentException      {@inheritDoc}
jaroslav@1890
  1070
     */
jaroslav@1890
  1071
    public int drainTo(Collection<? super E> c, int maxElements) {
jaroslav@1890
  1072
        if (c == null)
jaroslav@1890
  1073
            throw new NullPointerException();
jaroslav@1890
  1074
        if (c == this)
jaroslav@1890
  1075
            throw new IllegalArgumentException();
jaroslav@1890
  1076
        int n = 0;
jaroslav@1890
  1077
        E e;
jaroslav@1890
  1078
        while (n < maxElements && (e = poll()) != null) {
jaroslav@1890
  1079
            c.add(e);
jaroslav@1890
  1080
            ++n;
jaroslav@1890
  1081
        }
jaroslav@1890
  1082
        return n;
jaroslav@1890
  1083
    }
jaroslav@1890
  1084
jaroslav@1890
  1085
    /*
jaroslav@1890
  1086
     * To cope with serialization strategy in the 1.5 version of
jaroslav@1890
  1087
     * SynchronousQueue, we declare some unused classes and fields
jaroslav@1890
  1088
     * that exist solely to enable serializability across versions.
jaroslav@1890
  1089
     * These fields are never used, so are initialized only if this
jaroslav@1890
  1090
     * object is ever serialized or deserialized.
jaroslav@1890
  1091
     */
jaroslav@1890
  1092
jaroslav@1890
  1093
    static class WaitQueue implements java.io.Serializable { }
jaroslav@1890
  1094
    static class LifoWaitQueue extends WaitQueue {
jaroslav@1890
  1095
        private static final long serialVersionUID = -3633113410248163686L;
jaroslav@1890
  1096
    }
jaroslav@1890
  1097
    static class FifoWaitQueue extends WaitQueue {
jaroslav@1890
  1098
        private static final long serialVersionUID = -3623113410248163686L;
jaroslav@1890
  1099
    }
jaroslav@1890
  1100
    private ReentrantLock qlock;
jaroslav@1890
  1101
    private WaitQueue waitingProducers;
jaroslav@1890
  1102
    private WaitQueue waitingConsumers;
jaroslav@1890
  1103
jaroslav@1890
  1104
    /**
jaroslav@1890
  1105
     * Save the state to a stream (that is, serialize it).
jaroslav@1890
  1106
     *
jaroslav@1890
  1107
     * @param s the stream
jaroslav@1890
  1108
     */
jaroslav@1890
  1109
    private void writeObject(java.io.ObjectOutputStream s)
jaroslav@1890
  1110
        throws java.io.IOException {
jaroslav@1890
  1111
        boolean fair = transferer instanceof TransferQueue;
jaroslav@1890
  1112
        if (fair) {
jaroslav@1890
  1113
            qlock = new ReentrantLock(true);
jaroslav@1890
  1114
            waitingProducers = new FifoWaitQueue();
jaroslav@1890
  1115
            waitingConsumers = new FifoWaitQueue();
jaroslav@1890
  1116
        }
jaroslav@1890
  1117
        else {
jaroslav@1890
  1118
            qlock = new ReentrantLock();
jaroslav@1890
  1119
            waitingProducers = new LifoWaitQueue();
jaroslav@1890
  1120
            waitingConsumers = new LifoWaitQueue();
jaroslav@1890
  1121
        }
jaroslav@1890
  1122
        s.defaultWriteObject();
jaroslav@1890
  1123
    }
jaroslav@1890
  1124
jaroslav@1890
  1125
    private void readObject(final java.io.ObjectInputStream s)
jaroslav@1890
  1126
        throws java.io.IOException, ClassNotFoundException {
jaroslav@1890
  1127
        s.defaultReadObject();
jaroslav@1890
  1128
        if (waitingProducers instanceof FifoWaitQueue)
jaroslav@1890
  1129
            transferer = new TransferQueue();
jaroslav@1890
  1130
        else
jaroslav@1890
  1131
            transferer = new TransferStack();
jaroslav@1890
  1132
    }
jaroslav@1890
  1133
jaroslav@1890
  1134
jaroslav@1890
  1135
}