1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/Exchanger.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,687 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea, Bill Scherer, and Michael Scott with
1.35 + * assistance from members of JCP JSR-166 Expert Group and released to
1.36 + * the public domain, as explained at
1.37 + * http://creativecommons.org/publicdomain/zero/1.0/
1.38 + */
1.39 +
1.40 +package java.util.concurrent;
1.41 +import java.util.concurrent.atomic.*;
1.42 +import java.util.concurrent.locks.LockSupport;
1.43 +
1.44 +/**
1.45 + * A synchronization point at which threads can pair and swap elements
1.46 + * within pairs. Each thread presents some object on entry to the
1.47 + * {@link #exchange exchange} method, matches with a partner thread,
1.48 + * and receives its partner's object on return. An Exchanger may be
1.49 + * viewed as a bidirectional form of a {@link SynchronousQueue}.
1.50 + * Exchangers may be useful in applications such as genetic algorithms
1.51 + * and pipeline designs.
1.52 + *
1.53 + * <p><b>Sample Usage:</b>
1.54 + * Here are the highlights of a class that uses an {@code Exchanger}
1.55 + * to swap buffers between threads so that the thread filling the
1.56 + * buffer gets a freshly emptied one when it needs it, handing off the
1.57 + * filled one to the thread emptying the buffer.
1.58 + * <pre>{@code
1.59 + * class FillAndEmpty {
1.60 + * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
1.61 + * DataBuffer initialEmptyBuffer = ... a made-up type
1.62 + * DataBuffer initialFullBuffer = ...
1.63 + *
1.64 + * class FillingLoop implements Runnable {
1.65 + * public void run() {
1.66 + * DataBuffer currentBuffer = initialEmptyBuffer;
1.67 + * try {
1.68 + * while (currentBuffer != null) {
1.69 + * addToBuffer(currentBuffer);
1.70 + * if (currentBuffer.isFull())
1.71 + * currentBuffer = exchanger.exchange(currentBuffer);
1.72 + * }
1.73 + * } catch (InterruptedException ex) { ... handle ... }
1.74 + * }
1.75 + * }
1.76 + *
1.77 + * class EmptyingLoop implements Runnable {
1.78 + * public void run() {
1.79 + * DataBuffer currentBuffer = initialFullBuffer;
1.80 + * try {
1.81 + * while (currentBuffer != null) {
1.82 + * takeFromBuffer(currentBuffer);
1.83 + * if (currentBuffer.isEmpty())
1.84 + * currentBuffer = exchanger.exchange(currentBuffer);
1.85 + * }
1.86 + * } catch (InterruptedException ex) { ... handle ...}
1.87 + * }
1.88 + * }
1.89 + *
1.90 + * void start() {
1.91 + * new Thread(new FillingLoop()).start();
1.92 + * new Thread(new EmptyingLoop()).start();
1.93 + * }
1.94 + * }
1.95 + * }</pre>
1.96 + *
1.97 + * <p>Memory consistency effects: For each pair of threads that
1.98 + * successfully exchange objects via an {@code Exchanger}, actions
1.99 + * prior to the {@code exchange()} in each thread
1.100 + * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
1.101 + * those subsequent to a return from the corresponding {@code exchange()}
1.102 + * in the other thread.
1.103 + *
1.104 + * @since 1.5
1.105 + * @author Doug Lea and Bill Scherer and Michael Scott
1.106 + * @param <V> The type of objects that may be exchanged
1.107 + */
1.108 +public class Exchanger<V> {
1.109 + /*
1.110 + * Algorithm Description:
1.111 + *
1.112 + * The basic idea is to maintain a "slot", which is a reference to
1.113 + * a Node containing both an Item to offer and a "hole" waiting to
1.114 + * get filled in. If an incoming "occupying" thread sees that the
1.115 + * slot is null, it CAS'es (compareAndSets) a Node there and waits
1.116 + * for another to invoke exchange. That second "fulfilling" thread
1.117 + * sees that the slot is non-null, and so CASes it back to null,
1.118 + * also exchanging items by CASing the hole, plus waking up the
1.119 + * occupying thread if it is blocked. In each case CAS'es may
1.120 + * fail because a slot at first appears non-null but is null upon
1.121 + * CAS, or vice-versa. So threads may need to retry these
1.122 + * actions.
1.123 + *
1.124 + * This simple approach works great when there are only a few
1.125 + * threads using an Exchanger, but performance rapidly
1.126 + * deteriorates due to CAS contention on the single slot when
1.127 + * there are lots of threads using an exchanger. So instead we use
1.128 + * an "arena"; basically a kind of hash table with a dynamically
1.129 + * varying number of slots, any one of which can be used by
1.130 + * threads performing an exchange. Incoming threads pick slots
1.131 + * based on a hash of their Thread ids. If an incoming thread
1.132 + * fails to CAS in its chosen slot, it picks an alternative slot
1.133 + * instead. And similarly from there. If a thread successfully
1.134 + * CASes into a slot but no other thread arrives, it tries
1.135 + * another, heading toward the zero slot, which always exists even
1.136 + * if the table shrinks. The particular mechanics controlling this
1.137 + * are as follows:
1.138 + *
1.139 + * Waiting: Slot zero is special in that it is the only slot that
1.140 + * exists when there is no contention. A thread occupying slot
1.141 + * zero will block if no thread fulfills it after a short spin.
1.142 + * In other cases, occupying threads eventually give up and try
1.143 + * another slot. Waiting threads spin for a while (a period that
1.144 + * should be a little less than a typical context-switch time)
1.145 + * before either blocking (if slot zero) or giving up (if other
1.146 + * slots) and restarting. There is no reason for threads to block
1.147 + * unless there are unlikely to be any other threads present.
1.148 + * Occupants are mainly avoiding memory contention so sit there
1.149 + * quietly polling for a shorter period than it would take to
1.150 + * block and then unblock them. Non-slot-zero waits that elapse
1.151 + * because of lack of other threads waste around one extra
1.152 + * context-switch time per try, which is still on average much
1.153 + * faster than alternative approaches.
1.154 + *
1.155 + * Sizing: Usually, using only a few slots suffices to reduce
1.156 + * contention. Especially with small numbers of threads, using
1.157 + * too many slots can lead to just as poor performance as using
1.158 + * too few of them, and there's not much room for error. The
1.159 + * variable "max" maintains the number of slots actually in
1.160 + * use. It is increased when a thread sees too many CAS
1.161 + * failures. (This is analogous to resizing a regular hash table
1.162 + * based on a target load factor, except here, growth steps are
1.163 + * just one-by-one rather than proportional.) Growth requires
1.164 + * contention failures in each of three tried slots. Requiring
1.165 + * multiple failures for expansion copes with the fact that some
1.166 + * failed CASes are not due to contention but instead to simple
1.167 + * races between two threads or thread pre-emptions occurring
1.168 + * between reading and CASing. Also, very transient peak
1.169 + * contention can be much higher than the average sustainable
1.170 + * levels. An attempt to decrease the max limit is usually made
1.171 + * when a non-slot-zero wait elapses without being fulfilled.
1.172 + * Threads experiencing elapsed waits move closer to zero, so
1.173 + * eventually find existing (or future) threads even if the table
1.174 + * has been shrunk due to inactivity. The chosen mechanics and
1.175 + * thresholds for growing and shrinking are intrinsically
1.176 + * entangled with indexing and hashing inside the exchange code,
1.177 + * and can't be nicely abstracted out.
1.178 + *
1.179 + * Hashing: Each thread picks its initial slot to use in accord
1.180 + * with a simple hashcode. The sequence is the same on each
1.181 + * encounter by any given thread, but effectively random across
1.182 + * threads. Using arenas encounters the classic cost vs quality
1.183 + * tradeoffs of all hash tables. Here, we use a one-step FNV-1a
1.184 + * hash code based on the current thread's Thread.getId(), along
1.185 + * with a cheap approximation to a mod operation to select an
1.186 + * index. The downside of optimizing index selection in this way
1.187 + * is that the code is hardwired to use a maximum table size of
1.188 + * 32. But this value more than suffices for known platforms and
1.189 + * applications.
1.190 + *
1.191 + * Probing: On sensed contention of a selected slot, we probe
1.192 + * sequentially through the table, analogously to linear probing
1.193 + * after collision in a hash table. (We move circularly, in
1.194 + * reverse order, to mesh best with table growth and shrinkage
1.195 + * rules.) Except that to minimize the effects of false-alarms
1.196 + * and cache thrashing, we try the first selected slot twice
1.197 + * before moving.
1.198 + *
1.199 + * Padding: Even with contention management, slots are heavily
1.200 + * contended, so use cache-padding to avoid poor memory
1.201 + * performance. Because of this, slots are lazily constructed
1.202 + * only when used, to avoid wasting this space unnecessarily.
1.203 + * While isolation of locations is not much of an issue at first
1.204 + * in an application, as time goes on and garbage-collectors
1.205 + * perform compaction, slots are very likely to be moved adjacent
1.206 + * to each other, which can cause much thrashing of cache lines on
1.207 + * MPs unless padding is employed.
1.208 + *
1.209 + * This is an improvement of the algorithm described in the paper
1.210 + * "A Scalable Elimination-based Exchange Channel" by William
1.211 + * Scherer, Doug Lea, and Michael Scott in Proceedings of SCOOL05
1.212 + * workshop. Available at: http://hdl.handle.net/1802/2104
1.213 + */
1.214 +
1.215 + /** The number of CPUs, for sizing and spin control */
1.216 + private static final int NCPU = Runtime.getRuntime().availableProcessors();
1.217 +
1.218 + /**
1.219 + * The capacity of the arena. Set to a value that provides more
1.220 + * than enough space to handle contention. On small machines
1.221 + * most slots won't be used, but it is still not wasted because
1.222 + * the extra space provides some machine-level address padding
1.223 + * to minimize interference with heavily CAS'ed Slot locations.
1.224 + * And on very large machines, performance eventually becomes
1.225 + * bounded by memory bandwidth, not numbers of threads/CPUs.
1.226 + * This constant cannot be changed without also modifying
1.227 + * indexing and hashing algorithms.
1.228 + */
1.229 + private static final int CAPACITY = 32;
1.230 +
1.231 + /**
1.232 + * The value of "max" that will hold all threads without
1.233 + * contention. When this value is less than CAPACITY, some
1.234 + * otherwise wasted expansion can be avoided.
1.235 + */
1.236 + private static final int FULL =
1.237 + Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
1.238 +
1.239 + /**
1.240 + * The number of times to spin (doing nothing except polling a
1.241 + * memory location) before blocking or giving up while waiting to
1.242 + * be fulfilled. Should be zero on uniprocessors. On
1.243 + * multiprocessors, this value should be large enough so that two
1.244 + * threads exchanging items as fast as possible block only when
1.245 + * one of them is stalled (due to GC or preemption), but not much
1.246 + * longer, to avoid wasting CPU resources. Seen differently, this
1.247 + * value is a little over half the number of cycles of an average
1.248 + * context switch time on most systems. The value here is
1.249 + * approximately the average of those across a range of tested
1.250 + * systems.
1.251 + */
1.252 + private static final int SPINS = (NCPU == 1) ? 0 : 2000;
1.253 +
1.254 + /**
1.255 + * The number of times to spin before blocking in timed waits.
1.256 + * Timed waits spin more slowly because checking the time takes
1.257 + * time. The best value relies mainly on the relative rate of
1.258 + * System.nanoTime vs memory accesses. The value is empirically
1.259 + * derived to work well across a variety of systems.
1.260 + */
1.261 + private static final int TIMED_SPINS = SPINS / 20;
1.262 +
1.263 + /**
1.264 + * Sentinel item representing cancellation of a wait due to
1.265 + * interruption, timeout, or elapsed spin-waits. This value is
1.266 + * placed in holes on cancellation, and used as a return value
1.267 + * from waiting methods to indicate failure to set or get hole.
1.268 + */
1.269 + private static final Object CANCEL = new Object();
1.270 +
1.271 + /**
1.272 + * Value representing null arguments/returns from public
1.273 + * methods. This disambiguates from internal requirement that
1.274 + * holes start out as null to mean they are not yet set.
1.275 + */
1.276 + private static final Object NULL_ITEM = new Object();
1.277 +
1.278 + /**
1.279 + * Nodes hold partially exchanged data. This class
1.280 + * opportunistically subclasses AtomicReference to represent the
1.281 + * hole. So get() returns hole, and compareAndSet CAS'es value
1.282 + * into hole. This class cannot be parameterized as "V" because
1.283 + * of the use of non-V CANCEL sentinels.
1.284 + */
1.285 + private static final class Node extends AtomicReference<Object> {
1.286 + /** The element offered by the Thread creating this node. */
1.287 + public final Object item;
1.288 +
1.289 + /** The Thread waiting to be signalled; null until waiting. */
1.290 + public volatile Thread waiter;
1.291 +
1.292 + /**
1.293 + * Creates node with given item and empty hole.
1.294 + * @param item the item
1.295 + */
1.296 + public Node(Object item) {
1.297 + this.item = item;
1.298 + }
1.299 + }
1.300 +
1.301 + /**
1.302 + * A Slot is an AtomicReference with heuristic padding to lessen
1.303 + * cache effects of this heavily CAS'ed location. While the
1.304 + * padding adds noticeable space, all slots are created only on
1.305 + * demand, and there will be more than one of them only when it
1.306 + * would improve throughput more than enough to outweigh using
1.307 + * extra space.
1.308 + */
1.309 + private static final class Slot extends AtomicReference<Object> {
1.310 + // Improve likelihood of isolation on <= 64 byte cache lines
1.311 + long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
1.312 + }
1.313 +
1.314 + /**
1.315 + * Slot array. Elements are lazily initialized when needed.
1.316 + * Declared volatile to enable double-checked lazy construction.
1.317 + */
1.318 + private volatile Slot[] arena = new Slot[CAPACITY];
1.319 +
1.320 + /**
1.321 + * The maximum slot index being used. The value sometimes
1.322 + * increases when a thread experiences too many CAS contentions,
1.323 + * and sometimes decreases when a spin-wait elapses. Changes
1.324 + * are performed only via compareAndSet, to avoid stale values
1.325 + * when a thread happens to stall right before setting.
1.326 + */
1.327 + private final AtomicInteger max = new AtomicInteger();
1.328 +
1.329 + /**
1.330 + * Main exchange function, handling the different policy variants.
1.331 + * Uses Object, not "V" as argument and return value to simplify
1.332 + * handling of sentinel values. Callers from public methods decode
1.333 + * and cast accordingly.
1.334 + *
1.335 + * @param item the (non-null) item to exchange
1.336 + * @param timed true if the wait is timed
1.337 + * @param nanos if timed, the maximum wait time
1.338 + * @return the other thread's item, or CANCEL if interrupted or timed out
1.339 + */
1.340 + private Object doExchange(Object item, boolean timed, long nanos) {
1.341 + Node me = new Node(item); // Create in case occupying
1.342 + int index = hashIndex(); // Index of current slot
1.343 + int fails = 0; // Number of CAS failures
1.344 +
1.345 + for (;;) {
1.346 + Object y; // Contents of current slot
1.347 + Slot slot = arena[index];
1.348 + if (slot == null) // Lazily initialize slots
1.349 + createSlot(index); // Continue loop to reread
1.350 + else if ((y = slot.get()) != null && // Try to fulfill
1.351 + slot.compareAndSet(y, null)) {
1.352 + Node you = (Node)y; // Transfer item
1.353 + if (you.compareAndSet(null, item)) {
1.354 + LockSupport.unpark(you.waiter);
1.355 + return you.item;
1.356 + } // Else cancelled; continue
1.357 + }
1.358 + else if (y == null && // Try to occupy
1.359 + slot.compareAndSet(null, me)) {
1.360 + if (index == 0) // Blocking wait for slot 0
1.361 + return timed ?
1.362 + awaitNanos(me, slot, nanos) :
1.363 + await(me, slot);
1.364 + Object v = spinWait(me, slot); // Spin wait for non-0
1.365 + if (v != CANCEL)
1.366 + return v;
1.367 + me = new Node(item); // Throw away cancelled node
1.368 + int m = max.get();
1.369 + if (m > (index >>>= 1)) // Decrease index
1.370 + max.compareAndSet(m, m - 1); // Maybe shrink table
1.371 + }
1.372 + else if (++fails > 1) { // Allow 2 fails on 1st slot
1.373 + int m = max.get();
1.374 + if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
1.375 + index = m + 1; // Grow on 3rd failed slot
1.376 + else if (--index < 0)
1.377 + index = m; // Circularly traverse
1.378 + }
1.379 + }
1.380 + }
1.381 +
1.382 + /**
1.383 + * Returns a hash index for the current thread. Uses a one-step
1.384 + * FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
1.385 + * based on the current thread's Thread.getId(). These hash codes
1.386 + * have more uniform distribution properties with respect to small
1.387 + * moduli (here 1-31) than do other simple hashing functions.
1.388 + *
1.389 + * <p>To return an index between 0 and max, we use a cheap
1.390 + * approximation to a mod operation, that also corrects for bias
1.391 + * due to non-power-of-2 remaindering (see {@link
1.392 + * java.util.Random#nextInt}). Bits of the hashcode are masked
1.393 + * with "nbits", the ceiling power of two of table size (looked up
1.394 + * in a table packed into three ints). If too large, this is
1.395 + * retried after rotating the hash by nbits bits, while forcing new
1.396 + * top bit to 0, which guarantees eventual termination (although
1.397 + * with a non-random-bias). This requires an average of less than
1.398 + * 2 tries for all table sizes, and has a maximum 2% difference
1.399 + * from perfectly uniform slot probabilities when applied to all
1.400 + * possible hash codes for sizes less than 32.
1.401 + *
1.402 + * @return a per-thread-random index, 0 <= index < max
1.403 + */
1.404 + private final int hashIndex() {
1.405 + long id = Thread.currentThread().getId();
1.406 + int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
1.407 +
1.408 + int m = max.get();
1.409 + int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
1.410 + ((0x000001f8 >>> m) & 2) | // The constants hold
1.411 + ((0xffff00f2 >>> m) & 1)); // a lookup table
1.412 + int index;
1.413 + while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
1.414 + hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
1.415 + return index;
1.416 + }
1.417 +
1.418 + /**
1.419 + * Creates a new slot at given index. Called only when the slot
1.420 + * appears to be null. Relies on double-check using builtin
1.421 + * locks, since they rarely contend. This in turn relies on the
1.422 + * arena array being declared volatile.
1.423 + *
1.424 + * @param index the index to add slot at
1.425 + */
1.426 + private void createSlot(int index) {
1.427 + // Create slot outside of lock to narrow sync region
1.428 + Slot newSlot = new Slot();
1.429 + Slot[] a = arena;
1.430 + synchronized (a) {
1.431 + if (a[index] == null)
1.432 + a[index] = newSlot;
1.433 + }
1.434 + }
1.435 +
1.436 + /**
1.437 + * Tries to cancel a wait for the given node waiting in the given
1.438 + * slot, if so, helping clear the node from its slot to avoid
1.439 + * garbage retention.
1.440 + *
1.441 + * @param node the waiting node
1.442 + * @param the slot it is waiting in
1.443 + * @return true if successfully cancelled
1.444 + */
1.445 + private static boolean tryCancel(Node node, Slot slot) {
1.446 + if (!node.compareAndSet(null, CANCEL))
1.447 + return false;
1.448 + if (slot.get() == node) // pre-check to minimize contention
1.449 + slot.compareAndSet(node, null);
1.450 + return true;
1.451 + }
1.452 +
1.453 + // Three forms of waiting. Each just different enough not to merge
1.454 + // code with others.
1.455 +
1.456 + /**
1.457 + * Spin-waits for hole for a non-0 slot. Fails if spin elapses
1.458 + * before hole filled. Does not check interrupt, relying on check
1.459 + * in public exchange method to abort if interrupted on entry.
1.460 + *
1.461 + * @param node the waiting node
1.462 + * @return on success, the hole; on failure, CANCEL
1.463 + */
1.464 + private static Object spinWait(Node node, Slot slot) {
1.465 + int spins = SPINS;
1.466 + for (;;) {
1.467 + Object v = node.get();
1.468 + if (v != null)
1.469 + return v;
1.470 + else if (spins > 0)
1.471 + --spins;
1.472 + else
1.473 + tryCancel(node, slot);
1.474 + }
1.475 + }
1.476 +
1.477 + /**
1.478 + * Waits for (by spinning and/or blocking) and gets the hole
1.479 + * filled in by another thread. Fails if interrupted before
1.480 + * hole filled.
1.481 + *
1.482 + * When a node/thread is about to block, it sets its waiter field
1.483 + * and then rechecks state at least one more time before actually
1.484 + * parking, thus covering race vs fulfiller noticing that waiter
1.485 + * is non-null so should be woken.
1.486 + *
1.487 + * Thread interruption status is checked only surrounding calls to
1.488 + * park. The caller is assumed to have checked interrupt status
1.489 + * on entry.
1.490 + *
1.491 + * @param node the waiting node
1.492 + * @return on success, the hole; on failure, CANCEL
1.493 + */
1.494 + private static Object await(Node node, Slot slot) {
1.495 + Thread w = Thread.currentThread();
1.496 + int spins = SPINS;
1.497 + for (;;) {
1.498 + Object v = node.get();
1.499 + if (v != null)
1.500 + return v;
1.501 + else if (spins > 0) // Spin-wait phase
1.502 + --spins;
1.503 + else if (node.waiter == null) // Set up to block next
1.504 + node.waiter = w;
1.505 + else if (w.isInterrupted()) // Abort on interrupt
1.506 + tryCancel(node, slot);
1.507 + else // Block
1.508 + LockSupport.park(node);
1.509 + }
1.510 + }
1.511 +
1.512 + /**
1.513 + * Waits for (at index 0) and gets the hole filled in by another
1.514 + * thread. Fails if timed out or interrupted before hole filled.
1.515 + * Same basic logic as untimed version, but a bit messier.
1.516 + *
1.517 + * @param node the waiting node
1.518 + * @param nanos the wait time
1.519 + * @return on success, the hole; on failure, CANCEL
1.520 + */
1.521 + private Object awaitNanos(Node node, Slot slot, long nanos) {
1.522 + int spins = TIMED_SPINS;
1.523 + long lastTime = 0;
1.524 + Thread w = null;
1.525 + for (;;) {
1.526 + Object v = node.get();
1.527 + if (v != null)
1.528 + return v;
1.529 + long now = System.nanoTime();
1.530 + if (w == null)
1.531 + w = Thread.currentThread();
1.532 + else
1.533 + nanos -= now - lastTime;
1.534 + lastTime = now;
1.535 + if (nanos > 0) {
1.536 + if (spins > 0)
1.537 + --spins;
1.538 + else if (node.waiter == null)
1.539 + node.waiter = w;
1.540 + else if (w.isInterrupted())
1.541 + tryCancel(node, slot);
1.542 + else
1.543 + LockSupport.parkNanos(node, nanos);
1.544 + }
1.545 + else if (tryCancel(node, slot) && !w.isInterrupted())
1.546 + return scanOnTimeout(node);
1.547 + }
1.548 + }
1.549 +
1.550 + /**
1.551 + * Sweeps through arena checking for any waiting threads. Called
1.552 + * only upon return from timeout while waiting in slot 0. When a
1.553 + * thread gives up on a timed wait, it is possible that a
1.554 + * previously-entered thread is still waiting in some other
1.555 + * slot. So we scan to check for any. This is almost always
1.556 + * overkill, but decreases the likelihood of timeouts when there
1.557 + * are other threads present to far less than that in lock-based
1.558 + * exchangers in which earlier-arriving threads may still be
1.559 + * waiting on entry locks.
1.560 + *
1.561 + * @param node the waiting node
1.562 + * @return another thread's item, or CANCEL
1.563 + */
1.564 + private Object scanOnTimeout(Node node) {
1.565 + Object y;
1.566 + for (int j = arena.length - 1; j >= 0; --j) {
1.567 + Slot slot = arena[j];
1.568 + if (slot != null) {
1.569 + while ((y = slot.get()) != null) {
1.570 + if (slot.compareAndSet(y, null)) {
1.571 + Node you = (Node)y;
1.572 + if (you.compareAndSet(null, node.item)) {
1.573 + LockSupport.unpark(you.waiter);
1.574 + return you.item;
1.575 + }
1.576 + }
1.577 + }
1.578 + }
1.579 + }
1.580 + return CANCEL;
1.581 + }
1.582 +
1.583 + /**
1.584 + * Creates a new Exchanger.
1.585 + */
1.586 + public Exchanger() {
1.587 + }
1.588 +
1.589 + /**
1.590 + * Waits for another thread to arrive at this exchange point (unless
1.591 + * the current thread is {@linkplain Thread#interrupt interrupted}),
1.592 + * and then transfers the given object to it, receiving its object
1.593 + * in return.
1.594 + *
1.595 + * <p>If another thread is already waiting at the exchange point then
1.596 + * it is resumed for thread scheduling purposes and receives the object
1.597 + * passed in by the current thread. The current thread returns immediately,
1.598 + * receiving the object passed to the exchange by that other thread.
1.599 + *
1.600 + * <p>If no other thread is already waiting at the exchange then the
1.601 + * current thread is disabled for thread scheduling purposes and lies
1.602 + * dormant until one of two things happens:
1.603 + * <ul>
1.604 + * <li>Some other thread enters the exchange; or
1.605 + * <li>Some other thread {@linkplain Thread#interrupt interrupts}
1.606 + * the current thread.
1.607 + * </ul>
1.608 + * <p>If the current thread:
1.609 + * <ul>
1.610 + * <li>has its interrupted status set on entry to this method; or
1.611 + * <li>is {@linkplain Thread#interrupt interrupted} while waiting
1.612 + * for the exchange,
1.613 + * </ul>
1.614 + * then {@link InterruptedException} is thrown and the current thread's
1.615 + * interrupted status is cleared.
1.616 + *
1.617 + * @param x the object to exchange
1.618 + * @return the object provided by the other thread
1.619 + * @throws InterruptedException if the current thread was
1.620 + * interrupted while waiting
1.621 + */
1.622 + public V exchange(V x) throws InterruptedException {
1.623 + if (!Thread.interrupted()) {
1.624 + Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
1.625 + if (v == NULL_ITEM)
1.626 + return null;
1.627 + if (v != CANCEL)
1.628 + return (V)v;
1.629 + Thread.interrupted(); // Clear interrupt status on IE throw
1.630 + }
1.631 + throw new InterruptedException();
1.632 + }
1.633 +
1.634 + /**
1.635 + * Waits for another thread to arrive at this exchange point (unless
1.636 + * the current thread is {@linkplain Thread#interrupt interrupted} or
1.637 + * the specified waiting time elapses), and then transfers the given
1.638 + * object to it, receiving its object in return.
1.639 + *
1.640 + * <p>If another thread is already waiting at the exchange point then
1.641 + * it is resumed for thread scheduling purposes and receives the object
1.642 + * passed in by the current thread. The current thread returns immediately,
1.643 + * receiving the object passed to the exchange by that other thread.
1.644 + *
1.645 + * <p>If no other thread is already waiting at the exchange then the
1.646 + * current thread is disabled for thread scheduling purposes and lies
1.647 + * dormant until one of three things happens:
1.648 + * <ul>
1.649 + * <li>Some other thread enters the exchange; or
1.650 + * <li>Some other thread {@linkplain Thread#interrupt interrupts}
1.651 + * the current thread; or
1.652 + * <li>The specified waiting time elapses.
1.653 + * </ul>
1.654 + * <p>If the current thread:
1.655 + * <ul>
1.656 + * <li>has its interrupted status set on entry to this method; or
1.657 + * <li>is {@linkplain Thread#interrupt interrupted} while waiting
1.658 + * for the exchange,
1.659 + * </ul>
1.660 + * then {@link InterruptedException} is thrown and the current thread's
1.661 + * interrupted status is cleared.
1.662 + *
1.663 + * <p>If the specified waiting time elapses then {@link
1.664 + * TimeoutException} is thrown. If the time is less than or equal
1.665 + * to zero, the method will not wait at all.
1.666 + *
1.667 + * @param x the object to exchange
1.668 + * @param timeout the maximum time to wait
1.669 + * @param unit the time unit of the <tt>timeout</tt> argument
1.670 + * @return the object provided by the other thread
1.671 + * @throws InterruptedException if the current thread was
1.672 + * interrupted while waiting
1.673 + * @throws TimeoutException if the specified waiting time elapses
1.674 + * before another thread enters the exchange
1.675 + */
1.676 + public V exchange(V x, long timeout, TimeUnit unit)
1.677 + throws InterruptedException, TimeoutException {
1.678 + if (!Thread.interrupted()) {
1.679 + Object v = doExchange((x == null) ? NULL_ITEM : x,
1.680 + true, unit.toNanos(timeout));
1.681 + if (v == NULL_ITEM)
1.682 + return null;
1.683 + if (v != CANCEL)
1.684 + return (V)v;
1.685 + if (!Thread.interrupted())
1.686 + throw new TimeoutException();
1.687 + }
1.688 + throw new InterruptedException();
1.689 + }
1.690 +}