1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/Phaser.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,1152 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +
1.41 +import java.util.concurrent.TimeUnit;
1.42 +import java.util.concurrent.TimeoutException;
1.43 +import java.util.concurrent.atomic.AtomicReference;
1.44 +import java.util.concurrent.locks.LockSupport;
1.45 +
1.46 +/**
1.47 + * A reusable synchronization barrier, similar in functionality to
1.48 + * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
1.49 + * {@link java.util.concurrent.CountDownLatch CountDownLatch}
1.50 + * but supporting more flexible usage.
1.51 + *
1.52 + * <p> <b>Registration.</b> Unlike the case for other barriers, the
1.53 + * number of parties <em>registered</em> to synchronize on a phaser
1.54 + * may vary over time. Tasks may be registered at any time (using
1.55 + * methods {@link #register}, {@link #bulkRegister}, or forms of
1.56 + * constructors establishing initial numbers of parties), and
1.57 + * optionally deregistered upon any arrival (using {@link
1.58 + * #arriveAndDeregister}). As is the case with most basic
1.59 + * synchronization constructs, registration and deregistration affect
1.60 + * only internal counts; they do not establish any further internal
1.61 + * bookkeeping, so tasks cannot query whether they are registered.
1.62 + * (However, you can introduce such bookkeeping by subclassing this
1.63 + * class.)
1.64 + *
1.65 + * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
1.66 + * Phaser} may be repeatedly awaited. Method {@link
1.67 + * #arriveAndAwaitAdvance} has effect analogous to {@link
1.68 + * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
1.69 + * generation of a phaser has an associated phase number. The phase
1.70 + * number starts at zero, and advances when all parties arrive at the
1.71 + * phaser, wrapping around to zero after reaching {@code
1.72 + * Integer.MAX_VALUE}. The use of phase numbers enables independent
1.73 + * control of actions upon arrival at a phaser and upon awaiting
1.74 + * others, via two kinds of methods that may be invoked by any
1.75 + * registered party:
1.76 + *
1.77 + * <ul>
1.78 + *
1.79 + * <li> <b>Arrival.</b> Methods {@link #arrive} and
1.80 + * {@link #arriveAndDeregister} record arrival. These methods
1.81 + * do not block, but return an associated <em>arrival phase
1.82 + * number</em>; that is, the phase number of the phaser to which
1.83 + * the arrival applied. When the final party for a given phase
1.84 + * arrives, an optional action is performed and the phase
1.85 + * advances. These actions are performed by the party
1.86 + * triggering a phase advance, and are arranged by overriding
1.87 + * method {@link #onAdvance(int, int)}, which also controls
1.88 + * termination. Overriding this method is similar to, but more
1.89 + * flexible than, providing a barrier action to a {@code
1.90 + * CyclicBarrier}.
1.91 + *
1.92 + * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
1.93 + * argument indicating an arrival phase number, and returns when
1.94 + * the phaser advances to (or is already at) a different phase.
1.95 + * Unlike similar constructions using {@code CyclicBarrier},
1.96 + * method {@code awaitAdvance} continues to wait even if the
1.97 + * waiting thread is interrupted. Interruptible and timeout
1.98 + * versions are also available, but exceptions encountered while
1.99 + * tasks wait interruptibly or with timeout do not change the
1.100 + * state of the phaser. If necessary, you can perform any
1.101 + * associated recovery within handlers of those exceptions,
1.102 + * often after invoking {@code forceTermination}. Phasers may
1.103 + * also be used by tasks executing in a {@link ForkJoinPool},
1.104 + * which will ensure sufficient parallelism to execute tasks
1.105 + * when others are blocked waiting for a phase to advance.
1.106 + *
1.107 + * </ul>
1.108 + *
1.109 + * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
1.110 + * state, that may be checked using method {@link #isTerminated}. Upon
1.111 + * termination, all synchronization methods immediately return without
1.112 + * waiting for advance, as indicated by a negative return value.
1.113 + * Similarly, attempts to register upon termination have no effect.
1.114 + * Termination is triggered when an invocation of {@code onAdvance}
1.115 + * returns {@code true}. The default implementation returns {@code
1.116 + * true} if a deregistration has caused the number of registered
1.117 + * parties to become zero. As illustrated below, when phasers control
1.118 + * actions with a fixed number of iterations, it is often convenient
1.119 + * to override this method to cause termination when the current phase
1.120 + * number reaches a threshold. Method {@link #forceTermination} is
1.121 + * also available to abruptly release waiting threads and allow them
1.122 + * to terminate.
1.123 + *
1.124 + * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
1.125 + * constructed in tree structures) to reduce contention. Phasers with
1.126 + * large numbers of parties that would otherwise experience heavy
1.127 + * synchronization contention costs may instead be set up so that
1.128 + * groups of sub-phasers share a common parent. This may greatly
1.129 + * increase throughput even though it incurs greater per-operation
1.130 + * overhead.
1.131 + *
1.132 + * <p>In a tree of tiered phasers, registration and deregistration of
1.133 + * child phasers with their parent are managed automatically.
1.134 + * Whenever the number of registered parties of a child phaser becomes
1.135 + * non-zero (as established in the {@link #Phaser(Phaser,int)}
1.136 + * constructor, {@link #register}, or {@link #bulkRegister}), the
1.137 + * child phaser is registered with its parent. Whenever the number of
1.138 + * registered parties becomes zero as the result of an invocation of
1.139 + * {@link #arriveAndDeregister}, the child phaser is deregistered
1.140 + * from its parent.
1.141 + *
1.142 + * <p><b>Monitoring.</b> While synchronization methods may be invoked
1.143 + * only by registered parties, the current state of a phaser may be
1.144 + * monitored by any caller. At any given moment there are {@link
1.145 + * #getRegisteredParties} parties in total, of which {@link
1.146 + * #getArrivedParties} have arrived at the current phase ({@link
1.147 + * #getPhase}). When the remaining ({@link #getUnarrivedParties})
1.148 + * parties arrive, the phase advances. The values returned by these
1.149 + * methods may reflect transient states and so are not in general
1.150 + * useful for synchronization control. Method {@link #toString}
1.151 + * returns snapshots of these state queries in a form convenient for
1.152 + * informal monitoring.
1.153 + *
1.154 + * <p><b>Sample usages:</b>
1.155 + *
1.156 + * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
1.157 + * to control a one-shot action serving a variable number of parties.
1.158 + * The typical idiom is for the method setting this up to first
1.159 + * register, then start the actions, then deregister, as in:
1.160 + *
1.161 + * <pre> {@code
1.162 + * void runTasks(List<Runnable> tasks) {
1.163 + * final Phaser phaser = new Phaser(1); // "1" to register self
1.164 + * // create and start threads
1.165 + * for (final Runnable task : tasks) {
1.166 + * phaser.register();
1.167 + * new Thread() {
1.168 + * public void run() {
1.169 + * phaser.arriveAndAwaitAdvance(); // await all creation
1.170 + * task.run();
1.171 + * }
1.172 + * }.start();
1.173 + * }
1.174 + *
1.175 + * // allow threads to start and deregister self
1.176 + * phaser.arriveAndDeregister();
1.177 + * }}</pre>
1.178 + *
1.179 + * <p>One way to cause a set of threads to repeatedly perform actions
1.180 + * for a given number of iterations is to override {@code onAdvance}:
1.181 + *
1.182 + * <pre> {@code
1.183 + * void startTasks(List<Runnable> tasks, final int iterations) {
1.184 + * final Phaser phaser = new Phaser() {
1.185 + * protected boolean onAdvance(int phase, int registeredParties) {
1.186 + * return phase >= iterations || registeredParties == 0;
1.187 + * }
1.188 + * };
1.189 + * phaser.register();
1.190 + * for (final Runnable task : tasks) {
1.191 + * phaser.register();
1.192 + * new Thread() {
1.193 + * public void run() {
1.194 + * do {
1.195 + * task.run();
1.196 + * phaser.arriveAndAwaitAdvance();
1.197 + * } while (!phaser.isTerminated());
1.198 + * }
1.199 + * }.start();
1.200 + * }
1.201 + * phaser.arriveAndDeregister(); // deregister self, don't wait
1.202 + * }}</pre>
1.203 + *
1.204 + * If the main task must later await termination, it
1.205 + * may re-register and then execute a similar loop:
1.206 + * <pre> {@code
1.207 + * // ...
1.208 + * phaser.register();
1.209 + * while (!phaser.isTerminated())
1.210 + * phaser.arriveAndAwaitAdvance();}</pre>
1.211 + *
1.212 + * <p>Related constructions may be used to await particular phase numbers
1.213 + * in contexts where you are sure that the phase will never wrap around
1.214 + * {@code Integer.MAX_VALUE}. For example:
1.215 + *
1.216 + * <pre> {@code
1.217 + * void awaitPhase(Phaser phaser, int phase) {
1.218 + * int p = phaser.register(); // assumes caller not already registered
1.219 + * while (p < phase) {
1.220 + * if (phaser.isTerminated())
1.221 + * // ... deal with unexpected termination
1.222 + * else
1.223 + * p = phaser.arriveAndAwaitAdvance();
1.224 + * }
1.225 + * phaser.arriveAndDeregister();
1.226 + * }}</pre>
1.227 + *
1.228 + *
1.229 + * <p>To create a set of {@code n} tasks using a tree of phasers, you
1.230 + * could use code of the following form, assuming a Task class with a
1.231 + * constructor accepting a {@code Phaser} that it registers with upon
1.232 + * construction. After invocation of {@code build(new Task[n], 0, n,
1.233 + * new Phaser())}, these tasks could then be started, for example by
1.234 + * submitting to a pool:
1.235 + *
1.236 + * <pre> {@code
1.237 + * void build(Task[] tasks, int lo, int hi, Phaser ph) {
1.238 + * if (hi - lo > TASKS_PER_PHASER) {
1.239 + * for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
1.240 + * int j = Math.min(i + TASKS_PER_PHASER, hi);
1.241 + * build(tasks, i, j, new Phaser(ph));
1.242 + * }
1.243 + * } else {
1.244 + * for (int i = lo; i < hi; ++i)
1.245 + * tasks[i] = new Task(ph);
1.246 + * // assumes new Task(ph) performs ph.register()
1.247 + * }
1.248 + * }}</pre>
1.249 + *
1.250 + * The best value of {@code TASKS_PER_PHASER} depends mainly on
1.251 + * expected synchronization rates. A value as low as four may
1.252 + * be appropriate for extremely small per-phase task bodies (thus
1.253 + * high rates), or up to hundreds for extremely large ones.
1.254 + *
1.255 + * <p><b>Implementation notes</b>: This implementation restricts the
1.256 + * maximum number of parties to 65535. Attempts to register additional
1.257 + * parties result in {@code IllegalStateException}. However, you can and
1.258 + * should create tiered phasers to accommodate arbitrarily large sets
1.259 + * of participants.
1.260 + *
1.261 + * @since 1.7
1.262 + * @author Doug Lea
1.263 + */
1.264 +public class Phaser {
1.265 + /*
1.266 + * This class implements an extension of X10 "clocks". Thanks to
1.267 + * Vijay Saraswat for the idea, and to Vivek Sarkar for
1.268 + * enhancements to extend functionality.
1.269 + */
1.270 +
1.271 + /**
1.272 + * Primary state representation, holding four bit-fields:
1.273 + *
1.274 + * unarrived -- the number of parties yet to hit barrier (bits 0-15)
1.275 + * parties -- the number of parties to wait (bits 16-31)
1.276 + * phase -- the generation of the barrier (bits 32-62)
1.277 + * terminated -- set if barrier is terminated (bit 63 / sign)
1.278 + *
1.279 + * Except that a phaser with no registered parties is
1.280 + * distinguished by the otherwise illegal state of having zero
1.281 + * parties and one unarrived parties (encoded as EMPTY below).
1.282 + *
1.283 + * To efficiently maintain atomicity, these values are packed into
1.284 + * a single (atomic) long. Good performance relies on keeping
1.285 + * state decoding and encoding simple, and keeping race windows
1.286 + * short.
1.287 + *
1.288 + * All state updates are performed via CAS except initial
1.289 + * registration of a sub-phaser (i.e., one with a non-null
1.290 + * parent). In this (relatively rare) case, we use built-in
1.291 + * synchronization to lock while first registering with its
1.292 + * parent.
1.293 + *
1.294 + * The phase of a subphaser is allowed to lag that of its
1.295 + * ancestors until it is actually accessed -- see method
1.296 + * reconcileState.
1.297 + */
1.298 + private volatile long state;
1.299 +
1.300 + private static final int MAX_PARTIES = 0xffff;
1.301 + private static final int MAX_PHASE = Integer.MAX_VALUE;
1.302 + private static final int PARTIES_SHIFT = 16;
1.303 + private static final int PHASE_SHIFT = 32;
1.304 + private static final int UNARRIVED_MASK = 0xffff; // to mask ints
1.305 + private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
1.306 + private static final long TERMINATION_BIT = 1L << 63;
1.307 +
1.308 + // some special values
1.309 + private static final int ONE_ARRIVAL = 1;
1.310 + private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
1.311 + private static final int EMPTY = 1;
1.312 +
1.313 + // The following unpacking methods are usually manually inlined
1.314 +
1.315 + private static int unarrivedOf(long s) {
1.316 + int counts = (int)s;
1.317 + return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
1.318 + }
1.319 +
1.320 + private static int partiesOf(long s) {
1.321 + return (int)s >>> PARTIES_SHIFT;
1.322 + }
1.323 +
1.324 + private static int phaseOf(long s) {
1.325 + return (int)(s >>> PHASE_SHIFT);
1.326 + }
1.327 +
1.328 + private static int arrivedOf(long s) {
1.329 + int counts = (int)s;
1.330 + return (counts == EMPTY) ? 0 :
1.331 + (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
1.332 + }
1.333 +
1.334 + /**
1.335 + * The parent of this phaser, or null if none
1.336 + */
1.337 + private final Phaser parent;
1.338 +
1.339 + /**
1.340 + * The root of phaser tree. Equals this if not in a tree.
1.341 + */
1.342 + private final Phaser root;
1.343 +
1.344 + /**
1.345 + * Heads of Treiber stacks for waiting threads. To eliminate
1.346 + * contention when releasing some threads while adding others, we
1.347 + * use two of them, alternating across even and odd phases.
1.348 + * Subphasers share queues with root to speed up releases.
1.349 + */
1.350 + private final AtomicReference<QNode> evenQ;
1.351 + private final AtomicReference<QNode> oddQ;
1.352 +
1.353 + private AtomicReference<QNode> queueFor(int phase) {
1.354 + return ((phase & 1) == 0) ? evenQ : oddQ;
1.355 + }
1.356 +
1.357 + /**
1.358 + * Returns message string for bounds exceptions on arrival.
1.359 + */
1.360 + private String badArrive(long s) {
1.361 + return "Attempted arrival of unregistered party for " +
1.362 + stateToString(s);
1.363 + }
1.364 +
1.365 + /**
1.366 + * Returns message string for bounds exceptions on registration.
1.367 + */
1.368 + private String badRegister(long s) {
1.369 + return "Attempt to register more than " +
1.370 + MAX_PARTIES + " parties for " + stateToString(s);
1.371 + }
1.372 +
1.373 + /**
1.374 + * Main implementation for methods arrive and arriveAndDeregister.
1.375 + * Manually tuned to speed up and minimize race windows for the
1.376 + * common case of just decrementing unarrived field.
1.377 + *
1.378 + * @param deregister false for arrive, true for arriveAndDeregister
1.379 + */
1.380 + private int doArrive(boolean deregister) {
1.381 + int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
1.382 + final Phaser root = this.root;
1.383 + for (;;) {
1.384 + long s = (root == this) ? state : reconcileState();
1.385 + int phase = (int)(s >>> PHASE_SHIFT);
1.386 + int counts = (int)s;
1.387 + int unarrived = (counts & UNARRIVED_MASK) - 1;
1.388 + if (phase < 0)
1.389 + return phase;
1.390 + else if (counts == EMPTY || unarrived < 0) {
1.391 + if (root == this || reconcileState() == s)
1.392 + throw new IllegalStateException(badArrive(s));
1.393 + }
1.394 + else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
1.395 + if (unarrived == 0) {
1.396 + long n = s & PARTIES_MASK; // base of next state
1.397 + int nextUnarrived = (int)n >>> PARTIES_SHIFT;
1.398 + if (root != this)
1.399 + return parent.doArrive(nextUnarrived == 0);
1.400 + if (onAdvance(phase, nextUnarrived))
1.401 + n |= TERMINATION_BIT;
1.402 + else if (nextUnarrived == 0)
1.403 + n |= EMPTY;
1.404 + else
1.405 + n |= nextUnarrived;
1.406 + n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
1.407 + UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
1.408 + releaseWaiters(phase);
1.409 + }
1.410 + return phase;
1.411 + }
1.412 + }
1.413 + }
1.414 +
1.415 + /**
1.416 + * Implementation of register, bulkRegister
1.417 + *
1.418 + * @param registrations number to add to both parties and
1.419 + * unarrived fields. Must be greater than zero.
1.420 + */
1.421 + private int doRegister(int registrations) {
1.422 + // adjustment to state
1.423 + long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
1.424 + final Phaser parent = this.parent;
1.425 + int phase;
1.426 + for (;;) {
1.427 + long s = state;
1.428 + int counts = (int)s;
1.429 + int parties = counts >>> PARTIES_SHIFT;
1.430 + int unarrived = counts & UNARRIVED_MASK;
1.431 + if (registrations > MAX_PARTIES - parties)
1.432 + throw new IllegalStateException(badRegister(s));
1.433 + else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
1.434 + break;
1.435 + else if (counts != EMPTY) { // not 1st registration
1.436 + if (parent == null || reconcileState() == s) {
1.437 + if (unarrived == 0) // wait out advance
1.438 + root.internalAwaitAdvance(phase, null);
1.439 + else if (UNSAFE.compareAndSwapLong(this, stateOffset,
1.440 + s, s + adj))
1.441 + break;
1.442 + }
1.443 + }
1.444 + else if (parent == null) { // 1st root registration
1.445 + long next = ((long)phase << PHASE_SHIFT) | adj;
1.446 + if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
1.447 + break;
1.448 + }
1.449 + else {
1.450 + synchronized (this) { // 1st sub registration
1.451 + if (state == s) { // recheck under lock
1.452 + parent.doRegister(1);
1.453 + do { // force current phase
1.454 + phase = (int)(root.state >>> PHASE_SHIFT);
1.455 + // assert phase < 0 || (int)state == EMPTY;
1.456 + } while (!UNSAFE.compareAndSwapLong
1.457 + (this, stateOffset, state,
1.458 + ((long)phase << PHASE_SHIFT) | adj));
1.459 + break;
1.460 + }
1.461 + }
1.462 + }
1.463 + }
1.464 + return phase;
1.465 + }
1.466 +
1.467 + /**
1.468 + * Resolves lagged phase propagation from root if necessary.
1.469 + * Reconciliation normally occurs when root has advanced but
1.470 + * subphasers have not yet done so, in which case they must finish
1.471 + * their own advance by setting unarrived to parties (or if
1.472 + * parties is zero, resetting to unregistered EMPTY state).
1.473 + * However, this method may also be called when "floating"
1.474 + * subphasers with possibly some unarrived parties are merely
1.475 + * catching up to current phase, in which case counts are
1.476 + * unaffected.
1.477 + *
1.478 + * @return reconciled state
1.479 + */
1.480 + private long reconcileState() {
1.481 + final Phaser root = this.root;
1.482 + long s = state;
1.483 + if (root != this) {
1.484 + int phase, u, p;
1.485 + // CAS root phase with current parties; possibly trip unarrived
1.486 + while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
1.487 + (int)(s >>> PHASE_SHIFT) &&
1.488 + !UNSAFE.compareAndSwapLong
1.489 + (this, stateOffset, s,
1.490 + s = (((long)phase << PHASE_SHIFT) |
1.491 + (s & PARTIES_MASK) |
1.492 + ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
1.493 + (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
1.494 + s = state;
1.495 + }
1.496 + return s;
1.497 + }
1.498 +
1.499 + /**
1.500 + * Creates a new phaser with no initially registered parties, no
1.501 + * parent, and initial phase number 0. Any thread using this
1.502 + * phaser will need to first register for it.
1.503 + */
1.504 + public Phaser() {
1.505 + this(null, 0);
1.506 + }
1.507 +
1.508 + /**
1.509 + * Creates a new phaser with the given number of registered
1.510 + * unarrived parties, no parent, and initial phase number 0.
1.511 + *
1.512 + * @param parties the number of parties required to advance to the
1.513 + * next phase
1.514 + * @throws IllegalArgumentException if parties less than zero
1.515 + * or greater than the maximum number of parties supported
1.516 + */
1.517 + public Phaser(int parties) {
1.518 + this(null, parties);
1.519 + }
1.520 +
1.521 + /**
1.522 + * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
1.523 + *
1.524 + * @param parent the parent phaser
1.525 + */
1.526 + public Phaser(Phaser parent) {
1.527 + this(parent, 0);
1.528 + }
1.529 +
1.530 + /**
1.531 + * Creates a new phaser with the given parent and number of
1.532 + * registered unarrived parties. When the given parent is non-null
1.533 + * and the given number of parties is greater than zero, this
1.534 + * child phaser is registered with its parent.
1.535 + *
1.536 + * @param parent the parent phaser
1.537 + * @param parties the number of parties required to advance to the
1.538 + * next phase
1.539 + * @throws IllegalArgumentException if parties less than zero
1.540 + * or greater than the maximum number of parties supported
1.541 + */
1.542 + public Phaser(Phaser parent, int parties) {
1.543 + if (parties >>> PARTIES_SHIFT != 0)
1.544 + throw new IllegalArgumentException("Illegal number of parties");
1.545 + int phase = 0;
1.546 + this.parent = parent;
1.547 + if (parent != null) {
1.548 + final Phaser root = parent.root;
1.549 + this.root = root;
1.550 + this.evenQ = root.evenQ;
1.551 + this.oddQ = root.oddQ;
1.552 + if (parties != 0)
1.553 + phase = parent.doRegister(1);
1.554 + }
1.555 + else {
1.556 + this.root = this;
1.557 + this.evenQ = new AtomicReference<QNode>();
1.558 + this.oddQ = new AtomicReference<QNode>();
1.559 + }
1.560 + this.state = (parties == 0) ? (long)EMPTY :
1.561 + ((long)phase << PHASE_SHIFT) |
1.562 + ((long)parties << PARTIES_SHIFT) |
1.563 + ((long)parties);
1.564 + }
1.565 +
1.566 + /**
1.567 + * Adds a new unarrived party to this phaser. If an ongoing
1.568 + * invocation of {@link #onAdvance} is in progress, this method
1.569 + * may await its completion before returning. If this phaser has
1.570 + * a parent, and this phaser previously had no registered parties,
1.571 + * this child phaser is also registered with its parent. If
1.572 + * this phaser is terminated, the attempt to register has
1.573 + * no effect, and a negative value is returned.
1.574 + *
1.575 + * @return the arrival phase number to which this registration
1.576 + * applied. If this value is negative, then this phaser has
1.577 + * terminated, in which case registration has no effect.
1.578 + * @throws IllegalStateException if attempting to register more
1.579 + * than the maximum supported number of parties
1.580 + */
1.581 + public int register() {
1.582 + return doRegister(1);
1.583 + }
1.584 +
1.585 + /**
1.586 + * Adds the given number of new unarrived parties to this phaser.
1.587 + * If an ongoing invocation of {@link #onAdvance} is in progress,
1.588 + * this method may await its completion before returning. If this
1.589 + * phaser has a parent, and the given number of parties is greater
1.590 + * than zero, and this phaser previously had no registered
1.591 + * parties, this child phaser is also registered with its parent.
1.592 + * If this phaser is terminated, the attempt to register has no
1.593 + * effect, and a negative value is returned.
1.594 + *
1.595 + * @param parties the number of additional parties required to
1.596 + * advance to the next phase
1.597 + * @return the arrival phase number to which this registration
1.598 + * applied. If this value is negative, then this phaser has
1.599 + * terminated, in which case registration has no effect.
1.600 + * @throws IllegalStateException if attempting to register more
1.601 + * than the maximum supported number of parties
1.602 + * @throws IllegalArgumentException if {@code parties < 0}
1.603 + */
1.604 + public int bulkRegister(int parties) {
1.605 + if (parties < 0)
1.606 + throw new IllegalArgumentException();
1.607 + if (parties == 0)
1.608 + return getPhase();
1.609 + return doRegister(parties);
1.610 + }
1.611 +
1.612 + /**
1.613 + * Arrives at this phaser, without waiting for others to arrive.
1.614 + *
1.615 + * <p>It is a usage error for an unregistered party to invoke this
1.616 + * method. However, this error may result in an {@code
1.617 + * IllegalStateException} only upon some subsequent operation on
1.618 + * this phaser, if ever.
1.619 + *
1.620 + * @return the arrival phase number, or a negative value if terminated
1.621 + * @throws IllegalStateException if not terminated and the number
1.622 + * of unarrived parties would become negative
1.623 + */
1.624 + public int arrive() {
1.625 + return doArrive(false);
1.626 + }
1.627 +
1.628 + /**
1.629 + * Arrives at this phaser and deregisters from it without waiting
1.630 + * for others to arrive. Deregistration reduces the number of
1.631 + * parties required to advance in future phases. If this phaser
1.632 + * has a parent, and deregistration causes this phaser to have
1.633 + * zero parties, this phaser is also deregistered from its parent.
1.634 + *
1.635 + * <p>It is a usage error for an unregistered party to invoke this
1.636 + * method. However, this error may result in an {@code
1.637 + * IllegalStateException} only upon some subsequent operation on
1.638 + * this phaser, if ever.
1.639 + *
1.640 + * @return the arrival phase number, or a negative value if terminated
1.641 + * @throws IllegalStateException if not terminated and the number
1.642 + * of registered or unarrived parties would become negative
1.643 + */
1.644 + public int arriveAndDeregister() {
1.645 + return doArrive(true);
1.646 + }
1.647 +
1.648 + /**
1.649 + * Arrives at this phaser and awaits others. Equivalent in effect
1.650 + * to {@code awaitAdvance(arrive())}. If you need to await with
1.651 + * interruption or timeout, you can arrange this with an analogous
1.652 + * construction using one of the other forms of the {@code
1.653 + * awaitAdvance} method. If instead you need to deregister upon
1.654 + * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
1.655 + *
1.656 + * <p>It is a usage error for an unregistered party to invoke this
1.657 + * method. However, this error may result in an {@code
1.658 + * IllegalStateException} only upon some subsequent operation on
1.659 + * this phaser, if ever.
1.660 + *
1.661 + * @return the arrival phase number, or the (negative)
1.662 + * {@linkplain #getPhase() current phase} if terminated
1.663 + * @throws IllegalStateException if not terminated and the number
1.664 + * of unarrived parties would become negative
1.665 + */
1.666 + public int arriveAndAwaitAdvance() {
1.667 + // Specialization of doArrive+awaitAdvance eliminating some reads/paths
1.668 + final Phaser root = this.root;
1.669 + for (;;) {
1.670 + long s = (root == this) ? state : reconcileState();
1.671 + int phase = (int)(s >>> PHASE_SHIFT);
1.672 + int counts = (int)s;
1.673 + int unarrived = (counts & UNARRIVED_MASK) - 1;
1.674 + if (phase < 0)
1.675 + return phase;
1.676 + else if (counts == EMPTY || unarrived < 0) {
1.677 + if (reconcileState() == s)
1.678 + throw new IllegalStateException(badArrive(s));
1.679 + }
1.680 + else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
1.681 + s -= ONE_ARRIVAL)) {
1.682 + if (unarrived != 0)
1.683 + return root.internalAwaitAdvance(phase, null);
1.684 + if (root != this)
1.685 + return parent.arriveAndAwaitAdvance();
1.686 + long n = s & PARTIES_MASK; // base of next state
1.687 + int nextUnarrived = (int)n >>> PARTIES_SHIFT;
1.688 + if (onAdvance(phase, nextUnarrived))
1.689 + n |= TERMINATION_BIT;
1.690 + else if (nextUnarrived == 0)
1.691 + n |= EMPTY;
1.692 + else
1.693 + n |= nextUnarrived;
1.694 + int nextPhase = (phase + 1) & MAX_PHASE;
1.695 + n |= (long)nextPhase << PHASE_SHIFT;
1.696 + if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
1.697 + return (int)(state >>> PHASE_SHIFT); // terminated
1.698 + releaseWaiters(phase);
1.699 + return nextPhase;
1.700 + }
1.701 + }
1.702 + }
1.703 +
1.704 + /**
1.705 + * Awaits the phase of this phaser to advance from the given phase
1.706 + * value, returning immediately if the current phase is not equal
1.707 + * to the given phase value or this phaser is terminated.
1.708 + *
1.709 + * @param phase an arrival phase number, or negative value if
1.710 + * terminated; this argument is normally the value returned by a
1.711 + * previous call to {@code arrive} or {@code arriveAndDeregister}.
1.712 + * @return the next arrival phase number, or the argument if it is
1.713 + * negative, or the (negative) {@linkplain #getPhase() current phase}
1.714 + * if terminated
1.715 + */
1.716 + public int awaitAdvance(int phase) {
1.717 + final Phaser root = this.root;
1.718 + long s = (root == this) ? state : reconcileState();
1.719 + int p = (int)(s >>> PHASE_SHIFT);
1.720 + if (phase < 0)
1.721 + return phase;
1.722 + if (p == phase)
1.723 + return root.internalAwaitAdvance(phase, null);
1.724 + return p;
1.725 + }
1.726 +
1.727 + /**
1.728 + * Awaits the phase of this phaser to advance from the given phase
1.729 + * value, throwing {@code InterruptedException} if interrupted
1.730 + * while waiting, or returning immediately if the current phase is
1.731 + * not equal to the given phase value or this phaser is
1.732 + * terminated.
1.733 + *
1.734 + * @param phase an arrival phase number, or negative value if
1.735 + * terminated; this argument is normally the value returned by a
1.736 + * previous call to {@code arrive} or {@code arriveAndDeregister}.
1.737 + * @return the next arrival phase number, or the argument if it is
1.738 + * negative, or the (negative) {@linkplain #getPhase() current phase}
1.739 + * if terminated
1.740 + * @throws InterruptedException if thread interrupted while waiting
1.741 + */
1.742 + public int awaitAdvanceInterruptibly(int phase)
1.743 + throws InterruptedException {
1.744 + final Phaser root = this.root;
1.745 + long s = (root == this) ? state : reconcileState();
1.746 + int p = (int)(s >>> PHASE_SHIFT);
1.747 + if (phase < 0)
1.748 + return phase;
1.749 + if (p == phase) {
1.750 + QNode node = new QNode(this, phase, true, false, 0L);
1.751 + p = root.internalAwaitAdvance(phase, node);
1.752 + if (node.wasInterrupted)
1.753 + throw new InterruptedException();
1.754 + }
1.755 + return p;
1.756 + }
1.757 +
1.758 + /**
1.759 + * Awaits the phase of this phaser to advance from the given phase
1.760 + * value or the given timeout to elapse, throwing {@code
1.761 + * InterruptedException} if interrupted while waiting, or
1.762 + * returning immediately if the current phase is not equal to the
1.763 + * given phase value or this phaser is terminated.
1.764 + *
1.765 + * @param phase an arrival phase number, or negative value if
1.766 + * terminated; this argument is normally the value returned by a
1.767 + * previous call to {@code arrive} or {@code arriveAndDeregister}.
1.768 + * @param timeout how long to wait before giving up, in units of
1.769 + * {@code unit}
1.770 + * @param unit a {@code TimeUnit} determining how to interpret the
1.771 + * {@code timeout} parameter
1.772 + * @return the next arrival phase number, or the argument if it is
1.773 + * negative, or the (negative) {@linkplain #getPhase() current phase}
1.774 + * if terminated
1.775 + * @throws InterruptedException if thread interrupted while waiting
1.776 + * @throws TimeoutException if timed out while waiting
1.777 + */
1.778 + public int awaitAdvanceInterruptibly(int phase,
1.779 + long timeout, TimeUnit unit)
1.780 + throws InterruptedException, TimeoutException {
1.781 + long nanos = unit.toNanos(timeout);
1.782 + final Phaser root = this.root;
1.783 + long s = (root == this) ? state : reconcileState();
1.784 + int p = (int)(s >>> PHASE_SHIFT);
1.785 + if (phase < 0)
1.786 + return phase;
1.787 + if (p == phase) {
1.788 + QNode node = new QNode(this, phase, true, true, nanos);
1.789 + p = root.internalAwaitAdvance(phase, node);
1.790 + if (node.wasInterrupted)
1.791 + throw new InterruptedException();
1.792 + else if (p == phase)
1.793 + throw new TimeoutException();
1.794 + }
1.795 + return p;
1.796 + }
1.797 +
1.798 + /**
1.799 + * Forces this phaser to enter termination state. Counts of
1.800 + * registered parties are unaffected. If this phaser is a member
1.801 + * of a tiered set of phasers, then all of the phasers in the set
1.802 + * are terminated. If this phaser is already terminated, this
1.803 + * method has no effect. This method may be useful for
1.804 + * coordinating recovery after one or more tasks encounter
1.805 + * unexpected exceptions.
1.806 + */
1.807 + public void forceTermination() {
1.808 + // Only need to change root state
1.809 + final Phaser root = this.root;
1.810 + long s;
1.811 + while ((s = root.state) >= 0) {
1.812 + if (UNSAFE.compareAndSwapLong(root, stateOffset,
1.813 + s, s | TERMINATION_BIT)) {
1.814 + // signal all threads
1.815 + releaseWaiters(0);
1.816 + releaseWaiters(1);
1.817 + return;
1.818 + }
1.819 + }
1.820 + }
1.821 +
1.822 + /**
1.823 + * Returns the current phase number. The maximum phase number is
1.824 + * {@code Integer.MAX_VALUE}, after which it restarts at
1.825 + * zero. Upon termination, the phase number is negative,
1.826 + * in which case the prevailing phase prior to termination
1.827 + * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
1.828 + *
1.829 + * @return the phase number, or a negative value if terminated
1.830 + */
1.831 + public final int getPhase() {
1.832 + return (int)(root.state >>> PHASE_SHIFT);
1.833 + }
1.834 +
1.835 + /**
1.836 + * Returns the number of parties registered at this phaser.
1.837 + *
1.838 + * @return the number of parties
1.839 + */
1.840 + public int getRegisteredParties() {
1.841 + return partiesOf(state);
1.842 + }
1.843 +
1.844 + /**
1.845 + * Returns the number of registered parties that have arrived at
1.846 + * the current phase of this phaser. If this phaser has terminated,
1.847 + * the returned value is meaningless and arbitrary.
1.848 + *
1.849 + * @return the number of arrived parties
1.850 + */
1.851 + public int getArrivedParties() {
1.852 + return arrivedOf(reconcileState());
1.853 + }
1.854 +
1.855 + /**
1.856 + * Returns the number of registered parties that have not yet
1.857 + * arrived at the current phase of this phaser. If this phaser has
1.858 + * terminated, the returned value is meaningless and arbitrary.
1.859 + *
1.860 + * @return the number of unarrived parties
1.861 + */
1.862 + public int getUnarrivedParties() {
1.863 + return unarrivedOf(reconcileState());
1.864 + }
1.865 +
1.866 + /**
1.867 + * Returns the parent of this phaser, or {@code null} if none.
1.868 + *
1.869 + * @return the parent of this phaser, or {@code null} if none
1.870 + */
1.871 + public Phaser getParent() {
1.872 + return parent;
1.873 + }
1.874 +
1.875 + /**
1.876 + * Returns the root ancestor of this phaser, which is the same as
1.877 + * this phaser if it has no parent.
1.878 + *
1.879 + * @return the root ancestor of this phaser
1.880 + */
1.881 + public Phaser getRoot() {
1.882 + return root;
1.883 + }
1.884 +
1.885 + /**
1.886 + * Returns {@code true} if this phaser has been terminated.
1.887 + *
1.888 + * @return {@code true} if this phaser has been terminated
1.889 + */
1.890 + public boolean isTerminated() {
1.891 + return root.state < 0L;
1.892 + }
1.893 +
1.894 + /**
1.895 + * Overridable method to perform an action upon impending phase
1.896 + * advance, and to control termination. This method is invoked
1.897 + * upon arrival of the party advancing this phaser (when all other
1.898 + * waiting parties are dormant). If this method returns {@code
1.899 + * true}, this phaser will be set to a final termination state
1.900 + * upon advance, and subsequent calls to {@link #isTerminated}
1.901 + * will return true. Any (unchecked) Exception or Error thrown by
1.902 + * an invocation of this method is propagated to the party
1.903 + * attempting to advance this phaser, in which case no advance
1.904 + * occurs.
1.905 + *
1.906 + * <p>The arguments to this method provide the state of the phaser
1.907 + * prevailing for the current transition. The effects of invoking
1.908 + * arrival, registration, and waiting methods on this phaser from
1.909 + * within {@code onAdvance} are unspecified and should not be
1.910 + * relied on.
1.911 + *
1.912 + * <p>If this phaser is a member of a tiered set of phasers, then
1.913 + * {@code onAdvance} is invoked only for its root phaser on each
1.914 + * advance.
1.915 + *
1.916 + * <p>To support the most common use cases, the default
1.917 + * implementation of this method returns {@code true} when the
1.918 + * number of registered parties has become zero as the result of a
1.919 + * party invoking {@code arriveAndDeregister}. You can disable
1.920 + * this behavior, thus enabling continuation upon future
1.921 + * registrations, by overriding this method to always return
1.922 + * {@code false}:
1.923 + *
1.924 + * <pre> {@code
1.925 + * Phaser phaser = new Phaser() {
1.926 + * protected boolean onAdvance(int phase, int parties) { return false; }
1.927 + * }}</pre>
1.928 + *
1.929 + * @param phase the current phase number on entry to this method,
1.930 + * before this phaser is advanced
1.931 + * @param registeredParties the current number of registered parties
1.932 + * @return {@code true} if this phaser should terminate
1.933 + */
1.934 + protected boolean onAdvance(int phase, int registeredParties) {
1.935 + return registeredParties == 0;
1.936 + }
1.937 +
1.938 + /**
1.939 + * Returns a string identifying this phaser, as well as its
1.940 + * state. The state, in brackets, includes the String {@code
1.941 + * "phase = "} followed by the phase number, {@code "parties = "}
1.942 + * followed by the number of registered parties, and {@code
1.943 + * "arrived = "} followed by the number of arrived parties.
1.944 + *
1.945 + * @return a string identifying this phaser, as well as its state
1.946 + */
1.947 + public String toString() {
1.948 + return stateToString(reconcileState());
1.949 + }
1.950 +
1.951 + /**
1.952 + * Implementation of toString and string-based error messages
1.953 + */
1.954 + private String stateToString(long s) {
1.955 + return super.toString() +
1.956 + "[phase = " + phaseOf(s) +
1.957 + " parties = " + partiesOf(s) +
1.958 + " arrived = " + arrivedOf(s) + "]";
1.959 + }
1.960 +
1.961 + // Waiting mechanics
1.962 +
1.963 + /**
1.964 + * Removes and signals threads from queue for phase.
1.965 + */
1.966 + private void releaseWaiters(int phase) {
1.967 + QNode q; // first element of queue
1.968 + Thread t; // its thread
1.969 + AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1.970 + while ((q = head.get()) != null &&
1.971 + q.phase != (int)(root.state >>> PHASE_SHIFT)) {
1.972 + if (head.compareAndSet(q, q.next) &&
1.973 + (t = q.thread) != null) {
1.974 + q.thread = null;
1.975 + LockSupport.unpark(t);
1.976 + }
1.977 + }
1.978 + }
1.979 +
1.980 + /**
1.981 + * Variant of releaseWaiters that additionally tries to remove any
1.982 + * nodes no longer waiting for advance due to timeout or
1.983 + * interrupt. Currently, nodes are removed only if they are at
1.984 + * head of queue, which suffices to reduce memory footprint in
1.985 + * most usages.
1.986 + *
1.987 + * @return current phase on exit
1.988 + */
1.989 + private int abortWait(int phase) {
1.990 + AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1.991 + for (;;) {
1.992 + Thread t;
1.993 + QNode q = head.get();
1.994 + int p = (int)(root.state >>> PHASE_SHIFT);
1.995 + if (q == null || ((t = q.thread) != null && q.phase == p))
1.996 + return p;
1.997 + if (head.compareAndSet(q, q.next) && t != null) {
1.998 + q.thread = null;
1.999 + LockSupport.unpark(t);
1.1000 + }
1.1001 + }
1.1002 + }
1.1003 +
1.1004 + /** The number of CPUs, for spin control */
1.1005 + private static final int NCPU = Runtime.getRuntime().availableProcessors();
1.1006 +
1.1007 + /**
1.1008 + * The number of times to spin before blocking while waiting for
1.1009 + * advance, per arrival while waiting. On multiprocessors, fully
1.1010 + * blocking and waking up a large number of threads all at once is
1.1011 + * usually a very slow process, so we use rechargeable spins to
1.1012 + * avoid it when threads regularly arrive: When a thread in
1.1013 + * internalAwaitAdvance notices another arrival before blocking,
1.1014 + * and there appear to be enough CPUs available, it spins
1.1015 + * SPINS_PER_ARRIVAL more times before blocking. The value trades
1.1016 + * off good-citizenship vs big unnecessary slowdowns.
1.1017 + */
1.1018 + static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
1.1019 +
1.1020 + /**
1.1021 + * Possibly blocks and waits for phase to advance unless aborted.
1.1022 + * Call only from root node.
1.1023 + *
1.1024 + * @param phase current phase
1.1025 + * @param node if non-null, the wait node to track interrupt and timeout;
1.1026 + * if null, denotes noninterruptible wait
1.1027 + * @return current phase
1.1028 + */
1.1029 + private int internalAwaitAdvance(int phase, QNode node) {
1.1030 + releaseWaiters(phase-1); // ensure old queue clean
1.1031 + boolean queued = false; // true when node is enqueued
1.1032 + int lastUnarrived = 0; // to increase spins upon change
1.1033 + int spins = SPINS_PER_ARRIVAL;
1.1034 + long s;
1.1035 + int p;
1.1036 + while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
1.1037 + if (node == null) { // spinning in noninterruptible mode
1.1038 + int unarrived = (int)s & UNARRIVED_MASK;
1.1039 + if (unarrived != lastUnarrived &&
1.1040 + (lastUnarrived = unarrived) < NCPU)
1.1041 + spins += SPINS_PER_ARRIVAL;
1.1042 + boolean interrupted = Thread.interrupted();
1.1043 + if (interrupted || --spins < 0) { // need node to record intr
1.1044 + node = new QNode(this, phase, false, false, 0L);
1.1045 + node.wasInterrupted = interrupted;
1.1046 + }
1.1047 + }
1.1048 + else if (node.isReleasable()) // done or aborted
1.1049 + break;
1.1050 + else if (!queued) { // push onto queue
1.1051 + AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1.1052 + QNode q = node.next = head.get();
1.1053 + if ((q == null || q.phase == phase) &&
1.1054 + (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
1.1055 + queued = head.compareAndSet(q, node);
1.1056 + }
1.1057 + else {
1.1058 + try {
1.1059 + ForkJoinPool.managedBlock(node);
1.1060 + } catch (InterruptedException ie) {
1.1061 + node.wasInterrupted = true;
1.1062 + }
1.1063 + }
1.1064 + }
1.1065 +
1.1066 + if (node != null) {
1.1067 + if (node.thread != null)
1.1068 + node.thread = null; // avoid need for unpark()
1.1069 + if (node.wasInterrupted && !node.interruptible)
1.1070 + Thread.currentThread().interrupt();
1.1071 + if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
1.1072 + return abortWait(phase); // possibly clean up on abort
1.1073 + }
1.1074 + releaseWaiters(phase);
1.1075 + return p;
1.1076 + }
1.1077 +
1.1078 + /**
1.1079 + * Wait nodes for Treiber stack representing wait queue
1.1080 + */
1.1081 + static final class QNode implements ForkJoinPool.ManagedBlocker {
1.1082 + final Phaser phaser;
1.1083 + final int phase;
1.1084 + final boolean interruptible;
1.1085 + final boolean timed;
1.1086 + boolean wasInterrupted;
1.1087 + long nanos;
1.1088 + long lastTime;
1.1089 + volatile Thread thread; // nulled to cancel wait
1.1090 + QNode next;
1.1091 +
1.1092 + QNode(Phaser phaser, int phase, boolean interruptible,
1.1093 + boolean timed, long nanos) {
1.1094 + this.phaser = phaser;
1.1095 + this.phase = phase;
1.1096 + this.interruptible = interruptible;
1.1097 + this.nanos = nanos;
1.1098 + this.timed = timed;
1.1099 + this.lastTime = timed ? System.nanoTime() : 0L;
1.1100 + thread = Thread.currentThread();
1.1101 + }
1.1102 +
1.1103 + public boolean isReleasable() {
1.1104 + if (thread == null)
1.1105 + return true;
1.1106 + if (phaser.getPhase() != phase) {
1.1107 + thread = null;
1.1108 + return true;
1.1109 + }
1.1110 + if (Thread.interrupted())
1.1111 + wasInterrupted = true;
1.1112 + if (wasInterrupted && interruptible) {
1.1113 + thread = null;
1.1114 + return true;
1.1115 + }
1.1116 + if (timed) {
1.1117 + if (nanos > 0L) {
1.1118 + long now = System.nanoTime();
1.1119 + nanos -= now - lastTime;
1.1120 + lastTime = now;
1.1121 + }
1.1122 + if (nanos <= 0L) {
1.1123 + thread = null;
1.1124 + return true;
1.1125 + }
1.1126 + }
1.1127 + return false;
1.1128 + }
1.1129 +
1.1130 + public boolean block() {
1.1131 + if (isReleasable())
1.1132 + return true;
1.1133 + else if (!timed)
1.1134 + LockSupport.park(this);
1.1135 + else if (nanos > 0)
1.1136 + LockSupport.parkNanos(this, nanos);
1.1137 + return isReleasable();
1.1138 + }
1.1139 + }
1.1140 +
1.1141 + // Unsafe mechanics
1.1142 +
1.1143 + private static final sun.misc.Unsafe UNSAFE;
1.1144 + private static final long stateOffset;
1.1145 + static {
1.1146 + try {
1.1147 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.1148 + Class k = Phaser.class;
1.1149 + stateOffset = UNSAFE.objectFieldOffset
1.1150 + (k.getDeclaredField("state"));
1.1151 + } catch (Exception e) {
1.1152 + throw new Error(e);
1.1153 + }
1.1154 + }
1.1155 +}