1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/SynchronousQueue.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,1196 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea, Bill Scherer, and Michael Scott with
1.35 + * assistance from members of JCP JSR-166 Expert Group and released to
1.36 + * the public domain, as explained at
1.37 + * http://creativecommons.org/publicdomain/zero/1.0/
1.38 + */
1.39 +
1.40 +package java.util.concurrent;
1.41 +import java.util.concurrent.locks.*;
1.42 +import java.util.concurrent.atomic.*;
1.43 +import java.util.*;
1.44 +
1.45 +/**
1.46 + * A {@linkplain BlockingQueue blocking queue} in which each insert
1.47 + * operation must wait for a corresponding remove operation by another
1.48 + * thread, and vice versa. A synchronous queue does not have any
1.49 + * internal capacity, not even a capacity of one. You cannot
1.50 + * <tt>peek</tt> at a synchronous queue because an element is only
1.51 + * present when you try to remove it; you cannot insert an element
1.52 + * (using any method) unless another thread is trying to remove it;
1.53 + * you cannot iterate as there is nothing to iterate. The
1.54 + * <em>head</em> of the queue is the element that the first queued
1.55 + * inserting thread is trying to add to the queue; if there is no such
1.56 + * queued thread then no element is available for removal and
1.57 + * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
1.58 + * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
1.59 + * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
1.60 + * does not permit <tt>null</tt> elements.
1.61 + *
1.62 + * <p>Synchronous queues are similar to rendezvous channels used in
1.63 + * CSP and Ada. They are well suited for handoff designs, in which an
1.64 + * object running in one thread must sync up with an object running
1.65 + * in another thread in order to hand it some information, event, or
1.66 + * task.
1.67 + *
1.68 + * <p> This class supports an optional fairness policy for ordering
1.69 + * waiting producer and consumer threads. By default, this ordering
1.70 + * is not guaranteed. However, a queue constructed with fairness set
1.71 + * to <tt>true</tt> grants threads access in FIFO order.
1.72 + *
1.73 + * <p>This class and its iterator implement all of the
1.74 + * <em>optional</em> methods of the {@link Collection} and {@link
1.75 + * Iterator} interfaces.
1.76 + *
1.77 + * <p>This class is a member of the
1.78 + * <a href="{@docRoot}/../technotes/guides/collections/index.html">
1.79 + * Java Collections Framework</a>.
1.80 + *
1.81 + * @since 1.5
1.82 + * @author Doug Lea and Bill Scherer and Michael Scott
1.83 + * @param <E> the type of elements held in this collection
1.84 + */
1.85 +public class SynchronousQueue<E> extends AbstractQueue<E>
1.86 + implements BlockingQueue<E>, java.io.Serializable {
1.87 + private static final long serialVersionUID = -3223113410248163686L;
1.88 +
1.89 + /*
1.90 + * This class implements extensions of the dual stack and dual
1.91 + * queue algorithms described in "Nonblocking Concurrent Objects
1.92 + * with Condition Synchronization", by W. N. Scherer III and
1.93 + * M. L. Scott. 18th Annual Conf. on Distributed Computing,
1.94 + * Oct. 2004 (see also
1.95 + * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
1.96 + * The (Lifo) stack is used for non-fair mode, and the (Fifo)
1.97 + * queue for fair mode. The performance of the two is generally
1.98 + * similar. Fifo usually supports higher throughput under
1.99 + * contention but Lifo maintains higher thread locality in common
1.100 + * applications.
1.101 + *
1.102 + * A dual queue (and similarly stack) is one that at any given
1.103 + * time either holds "data" -- items provided by put operations,
1.104 + * or "requests" -- slots representing take operations, or is
1.105 + * empty. A call to "fulfill" (i.e., a call requesting an item
1.106 + * from a queue holding data or vice versa) dequeues a
1.107 + * complementary node. The most interesting feature of these
1.108 + * queues is that any operation can figure out which mode the
1.109 + * queue is in, and act accordingly without needing locks.
1.110 + *
1.111 + * Both the queue and stack extend abstract class Transferer
1.112 + * defining the single method transfer that does a put or a
1.113 + * take. These are unified into a single method because in dual
1.114 + * data structures, the put and take operations are symmetrical,
1.115 + * so nearly all code can be combined. The resulting transfer
1.116 + * methods are on the long side, but are easier to follow than
1.117 + * they would be if broken up into nearly-duplicated parts.
1.118 + *
1.119 + * The queue and stack data structures share many conceptual
1.120 + * similarities but very few concrete details. For simplicity,
1.121 + * they are kept distinct so that they can later evolve
1.122 + * separately.
1.123 + *
1.124 + * The algorithms here differ from the versions in the above paper
1.125 + * in extending them for use in synchronous queues, as well as
1.126 + * dealing with cancellation. The main differences include:
1.127 + *
1.128 + * 1. The original algorithms used bit-marked pointers, but
1.129 + * the ones here use mode bits in nodes, leading to a number
1.130 + * of further adaptations.
1.131 + * 2. SynchronousQueues must block threads waiting to become
1.132 + * fulfilled.
1.133 + * 3. Support for cancellation via timeout and interrupts,
1.134 + * including cleaning out cancelled nodes/threads
1.135 + * from lists to avoid garbage retention and memory depletion.
1.136 + *
1.137 + * Blocking is mainly accomplished using LockSupport park/unpark,
1.138 + * except that nodes that appear to be the next ones to become
1.139 + * fulfilled first spin a bit (on multiprocessors only). On very
1.140 + * busy synchronous queues, spinning can dramatically improve
1.141 + * throughput. And on less busy ones, the amount of spinning is
1.142 + * small enough not to be noticeable.
1.143 + *
1.144 + * Cleaning is done in different ways in queues vs stacks. For
1.145 + * queues, we can almost always remove a node immediately in O(1)
1.146 + * time (modulo retries for consistency checks) when it is
1.147 + * cancelled. But if it may be pinned as the current tail, it must
1.148 + * wait until some subsequent cancellation. For stacks, we need a
1.149 + * potentially O(n) traversal to be sure that we can remove the
1.150 + * node, but this can run concurrently with other threads
1.151 + * accessing the stack.
1.152 + *
1.153 + * While garbage collection takes care of most node reclamation
1.154 + * issues that otherwise complicate nonblocking algorithms, care
1.155 + * is taken to "forget" references to data, other nodes, and
1.156 + * threads that might be held on to long-term by blocked
1.157 + * threads. In cases where setting to null would otherwise
1.158 + * conflict with main algorithms, this is done by changing a
1.159 + * node's link to now point to the node itself. This doesn't arise
1.160 + * much for Stack nodes (because blocked threads do not hang on to
1.161 + * old head pointers), but references in Queue nodes must be
1.162 + * aggressively forgotten to avoid reachability of everything any
1.163 + * node has ever referred to since arrival.
1.164 + */
1.165 +
1.166 + /**
1.167 + * Shared internal API for dual stacks and queues.
1.168 + */
1.169 + abstract static class Transferer {
1.170 + /**
1.171 + * Performs a put or take.
1.172 + *
1.173 + * @param e if non-null, the item to be handed to a consumer;
1.174 + * if null, requests that transfer return an item
1.175 + * offered by producer.
1.176 + * @param timed if this operation should timeout
1.177 + * @param nanos the timeout, in nanoseconds
1.178 + * @return if non-null, the item provided or received; if null,
1.179 + * the operation failed due to timeout or interrupt --
1.180 + * the caller can distinguish which of these occurred
1.181 + * by checking Thread.interrupted.
1.182 + */
1.183 + abstract Object transfer(Object e, boolean timed, long nanos);
1.184 + }
1.185 +
1.186 + /** The number of CPUs, for spin control */
1.187 + static final int NCPUS = Runtime.getRuntime().availableProcessors();
1.188 +
1.189 + /**
1.190 + * The number of times to spin before blocking in timed waits.
1.191 + * The value is empirically derived -- it works well across a
1.192 + * variety of processors and OSes. Empirically, the best value
1.193 + * seems not to vary with number of CPUs (beyond 2) so is just
1.194 + * a constant.
1.195 + */
1.196 + static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
1.197 +
1.198 + /**
1.199 + * The number of times to spin before blocking in untimed waits.
1.200 + * This is greater than timed value because untimed waits spin
1.201 + * faster since they don't need to check times on each spin.
1.202 + */
1.203 + static final int maxUntimedSpins = maxTimedSpins * 16;
1.204 +
1.205 + /**
1.206 + * The number of nanoseconds for which it is faster to spin
1.207 + * rather than to use timed park. A rough estimate suffices.
1.208 + */
1.209 + static final long spinForTimeoutThreshold = 1000L;
1.210 +
1.211 + /** Dual stack */
1.212 + static final class TransferStack extends Transferer {
1.213 + /*
1.214 + * This extends Scherer-Scott dual stack algorithm, differing,
1.215 + * among other ways, by using "covering" nodes rather than
1.216 + * bit-marked pointers: Fulfilling operations push on marker
1.217 + * nodes (with FULFILLING bit set in mode) to reserve a spot
1.218 + * to match a waiting node.
1.219 + */
1.220 +
1.221 + /* Modes for SNodes, ORed together in node fields */
1.222 + /** Node represents an unfulfilled consumer */
1.223 + static final int REQUEST = 0;
1.224 + /** Node represents an unfulfilled producer */
1.225 + static final int DATA = 1;
1.226 + /** Node is fulfilling another unfulfilled DATA or REQUEST */
1.227 + static final int FULFILLING = 2;
1.228 +
1.229 + /** Return true if m has fulfilling bit set */
1.230 + static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
1.231 +
1.232 + /** Node class for TransferStacks. */
1.233 + static final class SNode {
1.234 + volatile SNode next; // next node in stack
1.235 + volatile SNode match; // the node matched to this
1.236 + volatile Thread waiter; // to control park/unpark
1.237 + Object item; // data; or null for REQUESTs
1.238 + int mode;
1.239 + // Note: item and mode fields don't need to be volatile
1.240 + // since they are always written before, and read after,
1.241 + // other volatile/atomic operations.
1.242 +
1.243 + SNode(Object item) {
1.244 + this.item = item;
1.245 + }
1.246 +
1.247 + boolean casNext(SNode cmp, SNode val) {
1.248 + return cmp == next &&
1.249 + UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
1.250 + }
1.251 +
1.252 + /**
1.253 + * Tries to match node s to this node, if so, waking up thread.
1.254 + * Fulfillers call tryMatch to identify their waiters.
1.255 + * Waiters block until they have been matched.
1.256 + *
1.257 + * @param s the node to match
1.258 + * @return true if successfully matched to s
1.259 + */
1.260 + boolean tryMatch(SNode s) {
1.261 + if (match == null &&
1.262 + UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
1.263 + Thread w = waiter;
1.264 + if (w != null) { // waiters need at most one unpark
1.265 + waiter = null;
1.266 + LockSupport.unpark(w);
1.267 + }
1.268 + return true;
1.269 + }
1.270 + return match == s;
1.271 + }
1.272 +
1.273 + /**
1.274 + * Tries to cancel a wait by matching node to itself.
1.275 + */
1.276 + void tryCancel() {
1.277 + UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
1.278 + }
1.279 +
1.280 + boolean isCancelled() {
1.281 + return match == this;
1.282 + }
1.283 +
1.284 + // Unsafe mechanics
1.285 + private static final sun.misc.Unsafe UNSAFE;
1.286 + private static final long matchOffset;
1.287 + private static final long nextOffset;
1.288 +
1.289 + static {
1.290 + try {
1.291 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.292 + Class k = SNode.class;
1.293 + matchOffset = UNSAFE.objectFieldOffset
1.294 + (k.getDeclaredField("match"));
1.295 + nextOffset = UNSAFE.objectFieldOffset
1.296 + (k.getDeclaredField("next"));
1.297 + } catch (Exception e) {
1.298 + throw new Error(e);
1.299 + }
1.300 + }
1.301 + }
1.302 +
1.303 + /** The head (top) of the stack */
1.304 + volatile SNode head;
1.305 +
1.306 + boolean casHead(SNode h, SNode nh) {
1.307 + return h == head &&
1.308 + UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
1.309 + }
1.310 +
1.311 + /**
1.312 + * Creates or resets fields of a node. Called only from transfer
1.313 + * where the node to push on stack is lazily created and
1.314 + * reused when possible to help reduce intervals between reads
1.315 + * and CASes of head and to avoid surges of garbage when CASes
1.316 + * to push nodes fail due to contention.
1.317 + */
1.318 + static SNode snode(SNode s, Object e, SNode next, int mode) {
1.319 + if (s == null) s = new SNode(e);
1.320 + s.mode = mode;
1.321 + s.next = next;
1.322 + return s;
1.323 + }
1.324 +
1.325 + /**
1.326 + * Puts or takes an item.
1.327 + */
1.328 + Object transfer(Object e, boolean timed, long nanos) {
1.329 + /*
1.330 + * Basic algorithm is to loop trying one of three actions:
1.331 + *
1.332 + * 1. If apparently empty or already containing nodes of same
1.333 + * mode, try to push node on stack and wait for a match,
1.334 + * returning it, or null if cancelled.
1.335 + *
1.336 + * 2. If apparently containing node of complementary mode,
1.337 + * try to push a fulfilling node on to stack, match
1.338 + * with corresponding waiting node, pop both from
1.339 + * stack, and return matched item. The matching or
1.340 + * unlinking might not actually be necessary because of
1.341 + * other threads performing action 3:
1.342 + *
1.343 + * 3. If top of stack already holds another fulfilling node,
1.344 + * help it out by doing its match and/or pop
1.345 + * operations, and then continue. The code for helping
1.346 + * is essentially the same as for fulfilling, except
1.347 + * that it doesn't return the item.
1.348 + */
1.349 +
1.350 + SNode s = null; // constructed/reused as needed
1.351 + int mode = (e == null) ? REQUEST : DATA;
1.352 +
1.353 + for (;;) {
1.354 + SNode h = head;
1.355 + if (h == null || h.mode == mode) { // empty or same-mode
1.356 + if (timed && nanos <= 0) { // can't wait
1.357 + if (h != null && h.isCancelled())
1.358 + casHead(h, h.next); // pop cancelled node
1.359 + else
1.360 + return null;
1.361 + } else if (casHead(h, s = snode(s, e, h, mode))) {
1.362 + SNode m = awaitFulfill(s, timed, nanos);
1.363 + if (m == s) { // wait was cancelled
1.364 + clean(s);
1.365 + return null;
1.366 + }
1.367 + if ((h = head) != null && h.next == s)
1.368 + casHead(h, s.next); // help s's fulfiller
1.369 + return (mode == REQUEST) ? m.item : s.item;
1.370 + }
1.371 + } else if (!isFulfilling(h.mode)) { // try to fulfill
1.372 + if (h.isCancelled()) // already cancelled
1.373 + casHead(h, h.next); // pop and retry
1.374 + else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
1.375 + for (;;) { // loop until matched or waiters disappear
1.376 + SNode m = s.next; // m is s's match
1.377 + if (m == null) { // all waiters are gone
1.378 + casHead(s, null); // pop fulfill node
1.379 + s = null; // use new node next time
1.380 + break; // restart main loop
1.381 + }
1.382 + SNode mn = m.next;
1.383 + if (m.tryMatch(s)) {
1.384 + casHead(s, mn); // pop both s and m
1.385 + return (mode == REQUEST) ? m.item : s.item;
1.386 + } else // lost match
1.387 + s.casNext(m, mn); // help unlink
1.388 + }
1.389 + }
1.390 + } else { // help a fulfiller
1.391 + SNode m = h.next; // m is h's match
1.392 + if (m == null) // waiter is gone
1.393 + casHead(h, null); // pop fulfilling node
1.394 + else {
1.395 + SNode mn = m.next;
1.396 + if (m.tryMatch(h)) // help match
1.397 + casHead(h, mn); // pop both h and m
1.398 + else // lost match
1.399 + h.casNext(m, mn); // help unlink
1.400 + }
1.401 + }
1.402 + }
1.403 + }
1.404 +
1.405 + /**
1.406 + * Spins/blocks until node s is matched by a fulfill operation.
1.407 + *
1.408 + * @param s the waiting node
1.409 + * @param timed true if timed wait
1.410 + * @param nanos timeout value
1.411 + * @return matched node, or s if cancelled
1.412 + */
1.413 + SNode awaitFulfill(SNode s, boolean timed, long nanos) {
1.414 + /*
1.415 + * When a node/thread is about to block, it sets its waiter
1.416 + * field and then rechecks state at least one more time
1.417 + * before actually parking, thus covering race vs
1.418 + * fulfiller noticing that waiter is non-null so should be
1.419 + * woken.
1.420 + *
1.421 + * When invoked by nodes that appear at the point of call
1.422 + * to be at the head of the stack, calls to park are
1.423 + * preceded by spins to avoid blocking when producers and
1.424 + * consumers are arriving very close in time. This can
1.425 + * happen enough to bother only on multiprocessors.
1.426 + *
1.427 + * The order of checks for returning out of main loop
1.428 + * reflects fact that interrupts have precedence over
1.429 + * normal returns, which have precedence over
1.430 + * timeouts. (So, on timeout, one last check for match is
1.431 + * done before giving up.) Except that calls from untimed
1.432 + * SynchronousQueue.{poll/offer} don't check interrupts
1.433 + * and don't wait at all, so are trapped in transfer
1.434 + * method rather than calling awaitFulfill.
1.435 + */
1.436 + long lastTime = timed ? System.nanoTime() : 0;
1.437 + Thread w = Thread.currentThread();
1.438 + SNode h = head;
1.439 + int spins = (shouldSpin(s) ?
1.440 + (timed ? maxTimedSpins : maxUntimedSpins) : 0);
1.441 + for (;;) {
1.442 + if (w.isInterrupted())
1.443 + s.tryCancel();
1.444 + SNode m = s.match;
1.445 + if (m != null)
1.446 + return m;
1.447 + if (timed) {
1.448 + long now = System.nanoTime();
1.449 + nanos -= now - lastTime;
1.450 + lastTime = now;
1.451 + if (nanos <= 0) {
1.452 + s.tryCancel();
1.453 + continue;
1.454 + }
1.455 + }
1.456 + if (spins > 0)
1.457 + spins = shouldSpin(s) ? (spins-1) : 0;
1.458 + else if (s.waiter == null)
1.459 + s.waiter = w; // establish waiter so can park next iter
1.460 + else if (!timed)
1.461 + LockSupport.park(this);
1.462 + else if (nanos > spinForTimeoutThreshold)
1.463 + LockSupport.parkNanos(this, nanos);
1.464 + }
1.465 + }
1.466 +
1.467 + /**
1.468 + * Returns true if node s is at head or there is an active
1.469 + * fulfiller.
1.470 + */
1.471 + boolean shouldSpin(SNode s) {
1.472 + SNode h = head;
1.473 + return (h == s || h == null || isFulfilling(h.mode));
1.474 + }
1.475 +
1.476 + /**
1.477 + * Unlinks s from the stack.
1.478 + */
1.479 + void clean(SNode s) {
1.480 + s.item = null; // forget item
1.481 + s.waiter = null; // forget thread
1.482 +
1.483 + /*
1.484 + * At worst we may need to traverse entire stack to unlink
1.485 + * s. If there are multiple concurrent calls to clean, we
1.486 + * might not see s if another thread has already removed
1.487 + * it. But we can stop when we see any node known to
1.488 + * follow s. We use s.next unless it too is cancelled, in
1.489 + * which case we try the node one past. We don't check any
1.490 + * further because we don't want to doubly traverse just to
1.491 + * find sentinel.
1.492 + */
1.493 +
1.494 + SNode past = s.next;
1.495 + if (past != null && past.isCancelled())
1.496 + past = past.next;
1.497 +
1.498 + // Absorb cancelled nodes at head
1.499 + SNode p;
1.500 + while ((p = head) != null && p != past && p.isCancelled())
1.501 + casHead(p, p.next);
1.502 +
1.503 + // Unsplice embedded nodes
1.504 + while (p != null && p != past) {
1.505 + SNode n = p.next;
1.506 + if (n != null && n.isCancelled())
1.507 + p.casNext(n, n.next);
1.508 + else
1.509 + p = n;
1.510 + }
1.511 + }
1.512 +
1.513 + // Unsafe mechanics
1.514 + private static final sun.misc.Unsafe UNSAFE;
1.515 + private static final long headOffset;
1.516 + static {
1.517 + try {
1.518 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.519 + Class k = TransferStack.class;
1.520 + headOffset = UNSAFE.objectFieldOffset
1.521 + (k.getDeclaredField("head"));
1.522 + } catch (Exception e) {
1.523 + throw new Error(e);
1.524 + }
1.525 + }
1.526 + }
1.527 +
1.528 + /** Dual Queue */
1.529 + static final class TransferQueue extends Transferer {
1.530 + /*
1.531 + * This extends Scherer-Scott dual queue algorithm, differing,
1.532 + * among other ways, by using modes within nodes rather than
1.533 + * marked pointers. The algorithm is a little simpler than
1.534 + * that for stacks because fulfillers do not need explicit
1.535 + * nodes, and matching is done by CAS'ing QNode.item field
1.536 + * from non-null to null (for put) or vice versa (for take).
1.537 + */
1.538 +
1.539 + /** Node class for TransferQueue. */
1.540 + static final class QNode {
1.541 + volatile QNode next; // next node in queue
1.542 + volatile Object item; // CAS'ed to or from null
1.543 + volatile Thread waiter; // to control park/unpark
1.544 + final boolean isData;
1.545 +
1.546 + QNode(Object item, boolean isData) {
1.547 + this.item = item;
1.548 + this.isData = isData;
1.549 + }
1.550 +
1.551 + boolean casNext(QNode cmp, QNode val) {
1.552 + return next == cmp &&
1.553 + UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
1.554 + }
1.555 +
1.556 + boolean casItem(Object cmp, Object val) {
1.557 + return item == cmp &&
1.558 + UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
1.559 + }
1.560 +
1.561 + /**
1.562 + * Tries to cancel by CAS'ing ref to this as item.
1.563 + */
1.564 + void tryCancel(Object cmp) {
1.565 + UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
1.566 + }
1.567 +
1.568 + boolean isCancelled() {
1.569 + return item == this;
1.570 + }
1.571 +
1.572 + /**
1.573 + * Returns true if this node is known to be off the queue
1.574 + * because its next pointer has been forgotten due to
1.575 + * an advanceHead operation.
1.576 + */
1.577 + boolean isOffList() {
1.578 + return next == this;
1.579 + }
1.580 +
1.581 + // Unsafe mechanics
1.582 + private static final sun.misc.Unsafe UNSAFE;
1.583 + private static final long itemOffset;
1.584 + private static final long nextOffset;
1.585 +
1.586 + static {
1.587 + try {
1.588 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.589 + Class k = QNode.class;
1.590 + itemOffset = UNSAFE.objectFieldOffset
1.591 + (k.getDeclaredField("item"));
1.592 + nextOffset = UNSAFE.objectFieldOffset
1.593 + (k.getDeclaredField("next"));
1.594 + } catch (Exception e) {
1.595 + throw new Error(e);
1.596 + }
1.597 + }
1.598 + }
1.599 +
1.600 + /** Head of queue */
1.601 + transient volatile QNode head;
1.602 + /** Tail of queue */
1.603 + transient volatile QNode tail;
1.604 + /**
1.605 + * Reference to a cancelled node that might not yet have been
1.606 + * unlinked from queue because it was the last inserted node
1.607 + * when it cancelled.
1.608 + */
1.609 + transient volatile QNode cleanMe;
1.610 +
1.611 + TransferQueue() {
1.612 + QNode h = new QNode(null, false); // initialize to dummy node.
1.613 + head = h;
1.614 + tail = h;
1.615 + }
1.616 +
1.617 + /**
1.618 + * Tries to cas nh as new head; if successful, unlink
1.619 + * old head's next node to avoid garbage retention.
1.620 + */
1.621 + void advanceHead(QNode h, QNode nh) {
1.622 + if (h == head &&
1.623 + UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
1.624 + h.next = h; // forget old next
1.625 + }
1.626 +
1.627 + /**
1.628 + * Tries to cas nt as new tail.
1.629 + */
1.630 + void advanceTail(QNode t, QNode nt) {
1.631 + if (tail == t)
1.632 + UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
1.633 + }
1.634 +
1.635 + /**
1.636 + * Tries to CAS cleanMe slot.
1.637 + */
1.638 + boolean casCleanMe(QNode cmp, QNode val) {
1.639 + return cleanMe == cmp &&
1.640 + UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
1.641 + }
1.642 +
1.643 + /**
1.644 + * Puts or takes an item.
1.645 + */
1.646 + Object transfer(Object e, boolean timed, long nanos) {
1.647 + /* Basic algorithm is to loop trying to take either of
1.648 + * two actions:
1.649 + *
1.650 + * 1. If queue apparently empty or holding same-mode nodes,
1.651 + * try to add node to queue of waiters, wait to be
1.652 + * fulfilled (or cancelled) and return matching item.
1.653 + *
1.654 + * 2. If queue apparently contains waiting items, and this
1.655 + * call is of complementary mode, try to fulfill by CAS'ing
1.656 + * item field of waiting node and dequeuing it, and then
1.657 + * returning matching item.
1.658 + *
1.659 + * In each case, along the way, check for and try to help
1.660 + * advance head and tail on behalf of other stalled/slow
1.661 + * threads.
1.662 + *
1.663 + * The loop starts off with a null check guarding against
1.664 + * seeing uninitialized head or tail values. This never
1.665 + * happens in current SynchronousQueue, but could if
1.666 + * callers held non-volatile/final ref to the
1.667 + * transferer. The check is here anyway because it places
1.668 + * null checks at top of loop, which is usually faster
1.669 + * than having them implicitly interspersed.
1.670 + */
1.671 +
1.672 + QNode s = null; // constructed/reused as needed
1.673 + boolean isData = (e != null);
1.674 +
1.675 + for (;;) {
1.676 + QNode t = tail;
1.677 + QNode h = head;
1.678 + if (t == null || h == null) // saw uninitialized value
1.679 + continue; // spin
1.680 +
1.681 + if (h == t || t.isData == isData) { // empty or same-mode
1.682 + QNode tn = t.next;
1.683 + if (t != tail) // inconsistent read
1.684 + continue;
1.685 + if (tn != null) { // lagging tail
1.686 + advanceTail(t, tn);
1.687 + continue;
1.688 + }
1.689 + if (timed && nanos <= 0) // can't wait
1.690 + return null;
1.691 + if (s == null)
1.692 + s = new QNode(e, isData);
1.693 + if (!t.casNext(null, s)) // failed to link in
1.694 + continue;
1.695 +
1.696 + advanceTail(t, s); // swing tail and wait
1.697 + Object x = awaitFulfill(s, e, timed, nanos);
1.698 + if (x == s) { // wait was cancelled
1.699 + clean(t, s);
1.700 + return null;
1.701 + }
1.702 +
1.703 + if (!s.isOffList()) { // not already unlinked
1.704 + advanceHead(t, s); // unlink if head
1.705 + if (x != null) // and forget fields
1.706 + s.item = s;
1.707 + s.waiter = null;
1.708 + }
1.709 + return (x != null) ? x : e;
1.710 +
1.711 + } else { // complementary-mode
1.712 + QNode m = h.next; // node to fulfill
1.713 + if (t != tail || m == null || h != head)
1.714 + continue; // inconsistent read
1.715 +
1.716 + Object x = m.item;
1.717 + if (isData == (x != null) || // m already fulfilled
1.718 + x == m || // m cancelled
1.719 + !m.casItem(x, e)) { // lost CAS
1.720 + advanceHead(h, m); // dequeue and retry
1.721 + continue;
1.722 + }
1.723 +
1.724 + advanceHead(h, m); // successfully fulfilled
1.725 + LockSupport.unpark(m.waiter);
1.726 + return (x != null) ? x : e;
1.727 + }
1.728 + }
1.729 + }
1.730 +
1.731 + /**
1.732 + * Spins/blocks until node s is fulfilled.
1.733 + *
1.734 + * @param s the waiting node
1.735 + * @param e the comparison value for checking match
1.736 + * @param timed true if timed wait
1.737 + * @param nanos timeout value
1.738 + * @return matched item, or s if cancelled
1.739 + */
1.740 + Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
1.741 + /* Same idea as TransferStack.awaitFulfill */
1.742 + long lastTime = timed ? System.nanoTime() : 0;
1.743 + Thread w = Thread.currentThread();
1.744 + int spins = ((head.next == s) ?
1.745 + (timed ? maxTimedSpins : maxUntimedSpins) : 0);
1.746 + for (;;) {
1.747 + if (w.isInterrupted())
1.748 + s.tryCancel(e);
1.749 + Object x = s.item;
1.750 + if (x != e)
1.751 + return x;
1.752 + if (timed) {
1.753 + long now = System.nanoTime();
1.754 + nanos -= now - lastTime;
1.755 + lastTime = now;
1.756 + if (nanos <= 0) {
1.757 + s.tryCancel(e);
1.758 + continue;
1.759 + }
1.760 + }
1.761 + if (spins > 0)
1.762 + --spins;
1.763 + else if (s.waiter == null)
1.764 + s.waiter = w;
1.765 + else if (!timed)
1.766 + LockSupport.park(this);
1.767 + else if (nanos > spinForTimeoutThreshold)
1.768 + LockSupport.parkNanos(this, nanos);
1.769 + }
1.770 + }
1.771 +
1.772 + /**
1.773 + * Gets rid of cancelled node s with original predecessor pred.
1.774 + */
1.775 + void clean(QNode pred, QNode s) {
1.776 + s.waiter = null; // forget thread
1.777 + /*
1.778 + * At any given time, exactly one node on list cannot be
1.779 + * deleted -- the last inserted node. To accommodate this,
1.780 + * if we cannot delete s, we save its predecessor as
1.781 + * "cleanMe", deleting the previously saved version
1.782 + * first. At least one of node s or the node previously
1.783 + * saved can always be deleted, so this always terminates.
1.784 + */
1.785 + while (pred.next == s) { // Return early if already unlinked
1.786 + QNode h = head;
1.787 + QNode hn = h.next; // Absorb cancelled first node as head
1.788 + if (hn != null && hn.isCancelled()) {
1.789 + advanceHead(h, hn);
1.790 + continue;
1.791 + }
1.792 + QNode t = tail; // Ensure consistent read for tail
1.793 + if (t == h)
1.794 + return;
1.795 + QNode tn = t.next;
1.796 + if (t != tail)
1.797 + continue;
1.798 + if (tn != null) {
1.799 + advanceTail(t, tn);
1.800 + continue;
1.801 + }
1.802 + if (s != t) { // If not tail, try to unsplice
1.803 + QNode sn = s.next;
1.804 + if (sn == s || pred.casNext(s, sn))
1.805 + return;
1.806 + }
1.807 + QNode dp = cleanMe;
1.808 + if (dp != null) { // Try unlinking previous cancelled node
1.809 + QNode d = dp.next;
1.810 + QNode dn;
1.811 + if (d == null || // d is gone or
1.812 + d == dp || // d is off list or
1.813 + !d.isCancelled() || // d not cancelled or
1.814 + (d != t && // d not tail and
1.815 + (dn = d.next) != null && // has successor
1.816 + dn != d && // that is on list
1.817 + dp.casNext(d, dn))) // d unspliced
1.818 + casCleanMe(dp, null);
1.819 + if (dp == pred)
1.820 + return; // s is already saved node
1.821 + } else if (casCleanMe(null, pred))
1.822 + return; // Postpone cleaning s
1.823 + }
1.824 + }
1.825 +
1.826 + private static final sun.misc.Unsafe UNSAFE;
1.827 + private static final long headOffset;
1.828 + private static final long tailOffset;
1.829 + private static final long cleanMeOffset;
1.830 + static {
1.831 + try {
1.832 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.833 + Class k = TransferQueue.class;
1.834 + headOffset = UNSAFE.objectFieldOffset
1.835 + (k.getDeclaredField("head"));
1.836 + tailOffset = UNSAFE.objectFieldOffset
1.837 + (k.getDeclaredField("tail"));
1.838 + cleanMeOffset = UNSAFE.objectFieldOffset
1.839 + (k.getDeclaredField("cleanMe"));
1.840 + } catch (Exception e) {
1.841 + throw new Error(e);
1.842 + }
1.843 + }
1.844 + }
1.845 +
1.846 + /**
1.847 + * The transferer. Set only in constructor, but cannot be declared
1.848 + * as final without further complicating serialization. Since
1.849 + * this is accessed only at most once per public method, there
1.850 + * isn't a noticeable performance penalty for using volatile
1.851 + * instead of final here.
1.852 + */
1.853 + private transient volatile Transferer transferer;
1.854 +
1.855 + /**
1.856 + * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
1.857 + */
1.858 + public SynchronousQueue() {
1.859 + this(false);
1.860 + }
1.861 +
1.862 + /**
1.863 + * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
1.864 + *
1.865 + * @param fair if true, waiting threads contend in FIFO order for
1.866 + * access; otherwise the order is unspecified.
1.867 + */
1.868 + public SynchronousQueue(boolean fair) {
1.869 + transferer = fair ? new TransferQueue() : new TransferStack();
1.870 + }
1.871 +
1.872 + /**
1.873 + * Adds the specified element to this queue, waiting if necessary for
1.874 + * another thread to receive it.
1.875 + *
1.876 + * @throws InterruptedException {@inheritDoc}
1.877 + * @throws NullPointerException {@inheritDoc}
1.878 + */
1.879 + public void put(E o) throws InterruptedException {
1.880 + if (o == null) throw new NullPointerException();
1.881 + if (transferer.transfer(o, false, 0) == null) {
1.882 + Thread.interrupted();
1.883 + throw new InterruptedException();
1.884 + }
1.885 + }
1.886 +
1.887 + /**
1.888 + * Inserts the specified element into this queue, waiting if necessary
1.889 + * up to the specified wait time for another thread to receive it.
1.890 + *
1.891 + * @return <tt>true</tt> if successful, or <tt>false</tt> if the
1.892 + * specified waiting time elapses before a consumer appears.
1.893 + * @throws InterruptedException {@inheritDoc}
1.894 + * @throws NullPointerException {@inheritDoc}
1.895 + */
1.896 + public boolean offer(E o, long timeout, TimeUnit unit)
1.897 + throws InterruptedException {
1.898 + if (o == null) throw new NullPointerException();
1.899 + if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
1.900 + return true;
1.901 + if (!Thread.interrupted())
1.902 + return false;
1.903 + throw new InterruptedException();
1.904 + }
1.905 +
1.906 + /**
1.907 + * Inserts the specified element into this queue, if another thread is
1.908 + * waiting to receive it.
1.909 + *
1.910 + * @param e the element to add
1.911 + * @return <tt>true</tt> if the element was added to this queue, else
1.912 + * <tt>false</tt>
1.913 + * @throws NullPointerException if the specified element is null
1.914 + */
1.915 + public boolean offer(E e) {
1.916 + if (e == null) throw new NullPointerException();
1.917 + return transferer.transfer(e, true, 0) != null;
1.918 + }
1.919 +
1.920 + /**
1.921 + * Retrieves and removes the head of this queue, waiting if necessary
1.922 + * for another thread to insert it.
1.923 + *
1.924 + * @return the head of this queue
1.925 + * @throws InterruptedException {@inheritDoc}
1.926 + */
1.927 + public E take() throws InterruptedException {
1.928 + Object e = transferer.transfer(null, false, 0);
1.929 + if (e != null)
1.930 + return (E)e;
1.931 + Thread.interrupted();
1.932 + throw new InterruptedException();
1.933 + }
1.934 +
1.935 + /**
1.936 + * Retrieves and removes the head of this queue, waiting
1.937 + * if necessary up to the specified wait time, for another thread
1.938 + * to insert it.
1.939 + *
1.940 + * @return the head of this queue, or <tt>null</tt> if the
1.941 + * specified waiting time elapses before an element is present.
1.942 + * @throws InterruptedException {@inheritDoc}
1.943 + */
1.944 + public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1.945 + Object e = transferer.transfer(null, true, unit.toNanos(timeout));
1.946 + if (e != null || !Thread.interrupted())
1.947 + return (E)e;
1.948 + throw new InterruptedException();
1.949 + }
1.950 +
1.951 + /**
1.952 + * Retrieves and removes the head of this queue, if another thread
1.953 + * is currently making an element available.
1.954 + *
1.955 + * @return the head of this queue, or <tt>null</tt> if no
1.956 + * element is available.
1.957 + */
1.958 + public E poll() {
1.959 + return (E)transferer.transfer(null, true, 0);
1.960 + }
1.961 +
1.962 + /**
1.963 + * Always returns <tt>true</tt>.
1.964 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.965 + *
1.966 + * @return <tt>true</tt>
1.967 + */
1.968 + public boolean isEmpty() {
1.969 + return true;
1.970 + }
1.971 +
1.972 + /**
1.973 + * Always returns zero.
1.974 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.975 + *
1.976 + * @return zero.
1.977 + */
1.978 + public int size() {
1.979 + return 0;
1.980 + }
1.981 +
1.982 + /**
1.983 + * Always returns zero.
1.984 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.985 + *
1.986 + * @return zero.
1.987 + */
1.988 + public int remainingCapacity() {
1.989 + return 0;
1.990 + }
1.991 +
1.992 + /**
1.993 + * Does nothing.
1.994 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.995 + */
1.996 + public void clear() {
1.997 + }
1.998 +
1.999 + /**
1.1000 + * Always returns <tt>false</tt>.
1.1001 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.1002 + *
1.1003 + * @param o the element
1.1004 + * @return <tt>false</tt>
1.1005 + */
1.1006 + public boolean contains(Object o) {
1.1007 + return false;
1.1008 + }
1.1009 +
1.1010 + /**
1.1011 + * Always returns <tt>false</tt>.
1.1012 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.1013 + *
1.1014 + * @param o the element to remove
1.1015 + * @return <tt>false</tt>
1.1016 + */
1.1017 + public boolean remove(Object o) {
1.1018 + return false;
1.1019 + }
1.1020 +
1.1021 + /**
1.1022 + * Returns <tt>false</tt> unless the given collection is empty.
1.1023 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.1024 + *
1.1025 + * @param c the collection
1.1026 + * @return <tt>false</tt> unless given collection is empty
1.1027 + */
1.1028 + public boolean containsAll(Collection<?> c) {
1.1029 + return c.isEmpty();
1.1030 + }
1.1031 +
1.1032 + /**
1.1033 + * Always returns <tt>false</tt>.
1.1034 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.1035 + *
1.1036 + * @param c the collection
1.1037 + * @return <tt>false</tt>
1.1038 + */
1.1039 + public boolean removeAll(Collection<?> c) {
1.1040 + return false;
1.1041 + }
1.1042 +
1.1043 + /**
1.1044 + * Always returns <tt>false</tt>.
1.1045 + * A <tt>SynchronousQueue</tt> has no internal capacity.
1.1046 + *
1.1047 + * @param c the collection
1.1048 + * @return <tt>false</tt>
1.1049 + */
1.1050 + public boolean retainAll(Collection<?> c) {
1.1051 + return false;
1.1052 + }
1.1053 +
1.1054 + /**
1.1055 + * Always returns <tt>null</tt>.
1.1056 + * A <tt>SynchronousQueue</tt> does not return elements
1.1057 + * unless actively waited on.
1.1058 + *
1.1059 + * @return <tt>null</tt>
1.1060 + */
1.1061 + public E peek() {
1.1062 + return null;
1.1063 + }
1.1064 +
1.1065 + /**
1.1066 + * Returns an empty iterator in which <tt>hasNext</tt> always returns
1.1067 + * <tt>false</tt>.
1.1068 + *
1.1069 + * @return an empty iterator
1.1070 + */
1.1071 + public Iterator<E> iterator() {
1.1072 + return Collections.emptyIterator();
1.1073 + }
1.1074 +
1.1075 + /**
1.1076 + * Returns a zero-length array.
1.1077 + * @return a zero-length array
1.1078 + */
1.1079 + public Object[] toArray() {
1.1080 + return new Object[0];
1.1081 + }
1.1082 +
1.1083 + /**
1.1084 + * Sets the zeroeth element of the specified array to <tt>null</tt>
1.1085 + * (if the array has non-zero length) and returns it.
1.1086 + *
1.1087 + * @param a the array
1.1088 + * @return the specified array
1.1089 + * @throws NullPointerException if the specified array is null
1.1090 + */
1.1091 + public <T> T[] toArray(T[] a) {
1.1092 + if (a.length > 0)
1.1093 + a[0] = null;
1.1094 + return a;
1.1095 + }
1.1096 +
1.1097 + /**
1.1098 + * @throws UnsupportedOperationException {@inheritDoc}
1.1099 + * @throws ClassCastException {@inheritDoc}
1.1100 + * @throws NullPointerException {@inheritDoc}
1.1101 + * @throws IllegalArgumentException {@inheritDoc}
1.1102 + */
1.1103 + public int drainTo(Collection<? super E> c) {
1.1104 + if (c == null)
1.1105 + throw new NullPointerException();
1.1106 + if (c == this)
1.1107 + throw new IllegalArgumentException();
1.1108 + int n = 0;
1.1109 + E e;
1.1110 + while ( (e = poll()) != null) {
1.1111 + c.add(e);
1.1112 + ++n;
1.1113 + }
1.1114 + return n;
1.1115 + }
1.1116 +
1.1117 + /**
1.1118 + * @throws UnsupportedOperationException {@inheritDoc}
1.1119 + * @throws ClassCastException {@inheritDoc}
1.1120 + * @throws NullPointerException {@inheritDoc}
1.1121 + * @throws IllegalArgumentException {@inheritDoc}
1.1122 + */
1.1123 + public int drainTo(Collection<? super E> c, int maxElements) {
1.1124 + if (c == null)
1.1125 + throw new NullPointerException();
1.1126 + if (c == this)
1.1127 + throw new IllegalArgumentException();
1.1128 + int n = 0;
1.1129 + E e;
1.1130 + while (n < maxElements && (e = poll()) != null) {
1.1131 + c.add(e);
1.1132 + ++n;
1.1133 + }
1.1134 + return n;
1.1135 + }
1.1136 +
1.1137 + /*
1.1138 + * To cope with serialization strategy in the 1.5 version of
1.1139 + * SynchronousQueue, we declare some unused classes and fields
1.1140 + * that exist solely to enable serializability across versions.
1.1141 + * These fields are never used, so are initialized only if this
1.1142 + * object is ever serialized or deserialized.
1.1143 + */
1.1144 +
1.1145 + static class WaitQueue implements java.io.Serializable { }
1.1146 + static class LifoWaitQueue extends WaitQueue {
1.1147 + private static final long serialVersionUID = -3633113410248163686L;
1.1148 + }
1.1149 + static class FifoWaitQueue extends WaitQueue {
1.1150 + private static final long serialVersionUID = -3623113410248163686L;
1.1151 + }
1.1152 + private ReentrantLock qlock;
1.1153 + private WaitQueue waitingProducers;
1.1154 + private WaitQueue waitingConsumers;
1.1155 +
1.1156 + /**
1.1157 + * Save the state to a stream (that is, serialize it).
1.1158 + *
1.1159 + * @param s the stream
1.1160 + */
1.1161 + private void writeObject(java.io.ObjectOutputStream s)
1.1162 + throws java.io.IOException {
1.1163 + boolean fair = transferer instanceof TransferQueue;
1.1164 + if (fair) {
1.1165 + qlock = new ReentrantLock(true);
1.1166 + waitingProducers = new FifoWaitQueue();
1.1167 + waitingConsumers = new FifoWaitQueue();
1.1168 + }
1.1169 + else {
1.1170 + qlock = new ReentrantLock();
1.1171 + waitingProducers = new LifoWaitQueue();
1.1172 + waitingConsumers = new LifoWaitQueue();
1.1173 + }
1.1174 + s.defaultWriteObject();
1.1175 + }
1.1176 +
1.1177 + private void readObject(final java.io.ObjectInputStream s)
1.1178 + throws java.io.IOException, ClassNotFoundException {
1.1179 + s.defaultReadObject();
1.1180 + if (waitingProducers instanceof FifoWaitQueue)
1.1181 + transferer = new TransferQueue();
1.1182 + else
1.1183 + transferer = new TransferStack();
1.1184 + }
1.1185 +
1.1186 + // Unsafe mechanics
1.1187 + static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1.1188 + String field, Class<?> klazz) {
1.1189 + try {
1.1190 + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1.1191 + } catch (NoSuchFieldException e) {
1.1192 + // Convert Exception to corresponding Error
1.1193 + NoSuchFieldError error = new NoSuchFieldError(field);
1.1194 + error.initCause(e);
1.1195 + throw error;
1.1196 + }
1.1197 + }
1.1198 +
1.1199 +}