rt/emul/compact/src/main/java/java/util/concurrent/Phaser.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 12:51:03 +0100
changeset 1895 bfaf3300b7ba
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent package compilable except ForkJoinPool
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.concurrent.TimeUnit;
    39 import java.util.concurrent.TimeoutException;
    40 import java.util.concurrent.atomic.AtomicReference;
    41 import java.util.concurrent.locks.LockSupport;
    42 
    43 /**
    44  * A reusable synchronization barrier, similar in functionality to
    45  * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
    46  * {@link java.util.concurrent.CountDownLatch CountDownLatch}
    47  * but supporting more flexible usage.
    48  *
    49  * <p> <b>Registration.</b> Unlike the case for other barriers, the
    50  * number of parties <em>registered</em> to synchronize on a phaser
    51  * may vary over time.  Tasks may be registered at any time (using
    52  * methods {@link #register}, {@link #bulkRegister}, or forms of
    53  * constructors establishing initial numbers of parties), and
    54  * optionally deregistered upon any arrival (using {@link
    55  * #arriveAndDeregister}).  As is the case with most basic
    56  * synchronization constructs, registration and deregistration affect
    57  * only internal counts; they do not establish any further internal
    58  * bookkeeping, so tasks cannot query whether they are registered.
    59  * (However, you can introduce such bookkeeping by subclassing this
    60  * class.)
    61  *
    62  * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
    63  * Phaser} may be repeatedly awaited.  Method {@link
    64  * #arriveAndAwaitAdvance} has effect analogous to {@link
    65  * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
    66  * generation of a phaser has an associated phase number. The phase
    67  * number starts at zero, and advances when all parties arrive at the
    68  * phaser, wrapping around to zero after reaching {@code
    69  * Integer.MAX_VALUE}. The use of phase numbers enables independent
    70  * control of actions upon arrival at a phaser and upon awaiting
    71  * others, via two kinds of methods that may be invoked by any
    72  * registered party:
    73  *
    74  * <ul>
    75  *
    76  *   <li> <b>Arrival.</b> Methods {@link #arrive} and
    77  *       {@link #arriveAndDeregister} record arrival.  These methods
    78  *       do not block, but return an associated <em>arrival phase
    79  *       number</em>; that is, the phase number of the phaser to which
    80  *       the arrival applied. When the final party for a given phase
    81  *       arrives, an optional action is performed and the phase
    82  *       advances.  These actions are performed by the party
    83  *       triggering a phase advance, and are arranged by overriding
    84  *       method {@link #onAdvance(int, int)}, which also controls
    85  *       termination. Overriding this method is similar to, but more
    86  *       flexible than, providing a barrier action to a {@code
    87  *       CyclicBarrier}.
    88  *
    89  *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
    90  *       argument indicating an arrival phase number, and returns when
    91  *       the phaser advances to (or is already at) a different phase.
    92  *       Unlike similar constructions using {@code CyclicBarrier},
    93  *       method {@code awaitAdvance} continues to wait even if the
    94  *       waiting thread is interrupted. Interruptible and timeout
    95  *       versions are also available, but exceptions encountered while
    96  *       tasks wait interruptibly or with timeout do not change the
    97  *       state of the phaser. If necessary, you can perform any
    98  *       associated recovery within handlers of those exceptions,
    99  *       often after invoking {@code forceTermination}.  Phasers may
   100  *       also be used by tasks executing in a {@link ForkJoinPool},
   101  *       which will ensure sufficient parallelism to execute tasks
   102  *       when others are blocked waiting for a phase to advance.
   103  *
   104  * </ul>
   105  *
   106  * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
   107  * state, that may be checked using method {@link #isTerminated}. Upon
   108  * termination, all synchronization methods immediately return without
   109  * waiting for advance, as indicated by a negative return value.
   110  * Similarly, attempts to register upon termination have no effect.
   111  * Termination is triggered when an invocation of {@code onAdvance}
   112  * returns {@code true}. The default implementation returns {@code
   113  * true} if a deregistration has caused the number of registered
   114  * parties to become zero.  As illustrated below, when phasers control
   115  * actions with a fixed number of iterations, it is often convenient
   116  * to override this method to cause termination when the current phase
   117  * number reaches a threshold. Method {@link #forceTermination} is
   118  * also available to abruptly release waiting threads and allow them
   119  * to terminate.
   120  *
   121  * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
   122  * constructed in tree structures) to reduce contention. Phasers with
   123  * large numbers of parties that would otherwise experience heavy
   124  * synchronization contention costs may instead be set up so that
   125  * groups of sub-phasers share a common parent.  This may greatly
   126  * increase throughput even though it incurs greater per-operation
   127  * overhead.
   128  *
   129  * <p>In a tree of tiered phasers, registration and deregistration of
   130  * child phasers with their parent are managed automatically.
   131  * Whenever the number of registered parties of a child phaser becomes
   132  * non-zero (as established in the {@link #Phaser(Phaser,int)}
   133  * constructor, {@link #register}, or {@link #bulkRegister}), the
   134  * child phaser is registered with its parent.  Whenever the number of
   135  * registered parties becomes zero as the result of an invocation of
   136  * {@link #arriveAndDeregister}, the child phaser is deregistered
   137  * from its parent.
   138  *
   139  * <p><b>Monitoring.</b> While synchronization methods may be invoked
   140  * only by registered parties, the current state of a phaser may be
   141  * monitored by any caller.  At any given moment there are {@link
   142  * #getRegisteredParties} parties in total, of which {@link
   143  * #getArrivedParties} have arrived at the current phase ({@link
   144  * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
   145  * parties arrive, the phase advances.  The values returned by these
   146  * methods may reflect transient states and so are not in general
   147  * useful for synchronization control.  Method {@link #toString}
   148  * returns snapshots of these state queries in a form convenient for
   149  * informal monitoring.
   150  *
   151  * <p><b>Sample usages:</b>
   152  *
   153  * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
   154  * to control a one-shot action serving a variable number of parties.
   155  * The typical idiom is for the method setting this up to first
   156  * register, then start the actions, then deregister, as in:
   157  *
   158  *  <pre> {@code
   159  * void runTasks(List<Runnable> tasks) {
   160  *   final Phaser phaser = new Phaser(1); // "1" to register self
   161  *   // create and start threads
   162  *   for (final Runnable task : tasks) {
   163  *     phaser.register();
   164  *     new Thread() {
   165  *       public void run() {
   166  *         phaser.arriveAndAwaitAdvance(); // await all creation
   167  *         task.run();
   168  *       }
   169  *     }.start();
   170  *   }
   171  *
   172  *   // allow threads to start and deregister self
   173  *   phaser.arriveAndDeregister();
   174  * }}</pre>
   175  *
   176  * <p>One way to cause a set of threads to repeatedly perform actions
   177  * for a given number of iterations is to override {@code onAdvance}:
   178  *
   179  *  <pre> {@code
   180  * void startTasks(List<Runnable> tasks, final int iterations) {
   181  *   final Phaser phaser = new Phaser() {
   182  *     protected boolean onAdvance(int phase, int registeredParties) {
   183  *       return phase >= iterations || registeredParties == 0;
   184  *     }
   185  *   };
   186  *   phaser.register();
   187  *   for (final Runnable task : tasks) {
   188  *     phaser.register();
   189  *     new Thread() {
   190  *       public void run() {
   191  *         do {
   192  *           task.run();
   193  *           phaser.arriveAndAwaitAdvance();
   194  *         } while (!phaser.isTerminated());
   195  *       }
   196  *     }.start();
   197  *   }
   198  *   phaser.arriveAndDeregister(); // deregister self, don't wait
   199  * }}</pre>
   200  *
   201  * If the main task must later await termination, it
   202  * may re-register and then execute a similar loop:
   203  *  <pre> {@code
   204  *   // ...
   205  *   phaser.register();
   206  *   while (!phaser.isTerminated())
   207  *     phaser.arriveAndAwaitAdvance();}</pre>
   208  *
   209  * <p>Related constructions may be used to await particular phase numbers
   210  * in contexts where you are sure that the phase will never wrap around
   211  * {@code Integer.MAX_VALUE}. For example:
   212  *
   213  *  <pre> {@code
   214  * void awaitPhase(Phaser phaser, int phase) {
   215  *   int p = phaser.register(); // assumes caller not already registered
   216  *   while (p < phase) {
   217  *     if (phaser.isTerminated())
   218  *       // ... deal with unexpected termination
   219  *     else
   220  *       p = phaser.arriveAndAwaitAdvance();
   221  *   }
   222  *   phaser.arriveAndDeregister();
   223  * }}</pre>
   224  *
   225  *
   226  * <p>To create a set of {@code n} tasks using a tree of phasers, you
   227  * could use code of the following form, assuming a Task class with a
   228  * constructor accepting a {@code Phaser} that it registers with upon
   229  * construction. After invocation of {@code build(new Task[n], 0, n,
   230  * new Phaser())}, these tasks could then be started, for example by
   231  * submitting to a pool:
   232  *
   233  *  <pre> {@code
   234  * void build(Task[] tasks, int lo, int hi, Phaser ph) {
   235  *   if (hi - lo > TASKS_PER_PHASER) {
   236  *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
   237  *       int j = Math.min(i + TASKS_PER_PHASER, hi);
   238  *       build(tasks, i, j, new Phaser(ph));
   239  *     }
   240  *   } else {
   241  *     for (int i = lo; i < hi; ++i)
   242  *       tasks[i] = new Task(ph);
   243  *       // assumes new Task(ph) performs ph.register()
   244  *   }
   245  * }}</pre>
   246  *
   247  * The best value of {@code TASKS_PER_PHASER} depends mainly on
   248  * expected synchronization rates. A value as low as four may
   249  * be appropriate for extremely small per-phase task bodies (thus
   250  * high rates), or up to hundreds for extremely large ones.
   251  *
   252  * <p><b>Implementation notes</b>: This implementation restricts the
   253  * maximum number of parties to 65535. Attempts to register additional
   254  * parties result in {@code IllegalStateException}. However, you can and
   255  * should create tiered phasers to accommodate arbitrarily large sets
   256  * of participants.
   257  *
   258  * @since 1.7
   259  * @author Doug Lea
   260  */
   261 public class Phaser {
   262     /*
   263      * This class implements an extension of X10 "clocks".  Thanks to
   264      * Vijay Saraswat for the idea, and to Vivek Sarkar for
   265      * enhancements to extend functionality.
   266      */
   267 
   268     /**
   269      * Primary state representation, holding four bit-fields:
   270      *
   271      * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
   272      * parties    -- the number of parties to wait            (bits 16-31)
   273      * phase      -- the generation of the barrier            (bits 32-62)
   274      * terminated -- set if barrier is terminated             (bit  63 / sign)
   275      *
   276      * Except that a phaser with no registered parties is
   277      * distinguished by the otherwise illegal state of having zero
   278      * parties and one unarrived parties (encoded as EMPTY below).
   279      *
   280      * To efficiently maintain atomicity, these values are packed into
   281      * a single (atomic) long. Good performance relies on keeping
   282      * state decoding and encoding simple, and keeping race windows
   283      * short.
   284      *
   285      * All state updates are performed via CAS except initial
   286      * registration of a sub-phaser (i.e., one with a non-null
   287      * parent).  In this (relatively rare) case, we use built-in
   288      * synchronization to lock while first registering with its
   289      * parent.
   290      *
   291      * The phase of a subphaser is allowed to lag that of its
   292      * ancestors until it is actually accessed -- see method
   293      * reconcileState.
   294      */
   295     private volatile long state;
   296 
   297     private static final int  MAX_PARTIES     = 0xffff;
   298     private static final int  MAX_PHASE       = Integer.MAX_VALUE;
   299     private static final int  PARTIES_SHIFT   = 16;
   300     private static final int  PHASE_SHIFT     = 32;
   301     private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
   302     private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
   303     private static final long TERMINATION_BIT = 1L << 63;
   304 
   305     // some special values
   306     private static final int  ONE_ARRIVAL     = 1;
   307     private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
   308     private static final int  EMPTY           = 1;
   309 
   310     // The following unpacking methods are usually manually inlined
   311 
   312     private static int unarrivedOf(long s) {
   313         int counts = (int)s;
   314         return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
   315     }
   316 
   317     private static int partiesOf(long s) {
   318         return (int)s >>> PARTIES_SHIFT;
   319     }
   320 
   321     private static int phaseOf(long s) {
   322         return (int)(s >>> PHASE_SHIFT);
   323     }
   324 
   325     private static int arrivedOf(long s) {
   326         int counts = (int)s;
   327         return (counts == EMPTY) ? 0 :
   328             (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
   329     }
   330 
   331     /**
   332      * The parent of this phaser, or null if none
   333      */
   334     private final Phaser parent;
   335 
   336     /**
   337      * The root of phaser tree. Equals this if not in a tree.
   338      */
   339     private final Phaser root;
   340 
   341     /**
   342      * Heads of Treiber stacks for waiting threads. To eliminate
   343      * contention when releasing some threads while adding others, we
   344      * use two of them, alternating across even and odd phases.
   345      * Subphasers share queues with root to speed up releases.
   346      */
   347     private final AtomicReference<QNode> evenQ;
   348     private final AtomicReference<QNode> oddQ;
   349 
   350     private AtomicReference<QNode> queueFor(int phase) {
   351         return ((phase & 1) == 0) ? evenQ : oddQ;
   352     }
   353 
   354     /**
   355      * Returns message string for bounds exceptions on arrival.
   356      */
   357     private String badArrive(long s) {
   358         return "Attempted arrival of unregistered party for " +
   359             stateToString(s);
   360     }
   361 
   362     /**
   363      * Returns message string for bounds exceptions on registration.
   364      */
   365     private String badRegister(long s) {
   366         return "Attempt to register more than " +
   367             MAX_PARTIES + " parties for " + stateToString(s);
   368     }
   369 
   370     /**
   371      * Main implementation for methods arrive and arriveAndDeregister.
   372      * Manually tuned to speed up and minimize race windows for the
   373      * common case of just decrementing unarrived field.
   374      *
   375      * @param deregister false for arrive, true for arriveAndDeregister
   376      */
   377     private int doArrive(boolean deregister) {
   378         int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
   379         final Phaser root = this.root;
   380         for (;;) {
   381             long s = (root == this) ? state : reconcileState();
   382             int phase = (int)(s >>> PHASE_SHIFT);
   383             int counts = (int)s;
   384             int unarrived = (counts & UNARRIVED_MASK) - 1;
   385             if (phase < 0)
   386                 return phase;
   387             else if (counts == EMPTY || unarrived < 0) {
   388                 if (root == this || reconcileState() == s)
   389                     throw new IllegalStateException(badArrive(s));
   390             }
   391             else if (compareAndSwapLong(s, s-=adj)) {
   392                 if (unarrived == 0) {
   393                     long n = s & PARTIES_MASK;  // base of next state
   394                     int nextUnarrived = (int)n >>> PARTIES_SHIFT;
   395                     if (root != this)
   396                         return parent.doArrive(nextUnarrived == 0);
   397                     if (onAdvance(phase, nextUnarrived))
   398                         n |= TERMINATION_BIT;
   399                     else if (nextUnarrived == 0)
   400                         n |= EMPTY;
   401                     else
   402                         n |= nextUnarrived;
   403                     n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
   404                     compareAndSwapLong(s, n);
   405                     releaseWaiters(phase);
   406                 }
   407                 return phase;
   408             }
   409         }
   410     }
   411 
   412     /**
   413      * Implementation of register, bulkRegister
   414      *
   415      * @param registrations number to add to both parties and
   416      * unarrived fields. Must be greater than zero.
   417      */
   418     private int doRegister(int registrations) {
   419         // adjustment to state
   420         long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
   421         final Phaser parent = this.parent;
   422         int phase;
   423         for (;;) {
   424             long s = state;
   425             int counts = (int)s;
   426             int parties = counts >>> PARTIES_SHIFT;
   427             int unarrived = counts & UNARRIVED_MASK;
   428             if (registrations > MAX_PARTIES - parties)
   429                 throw new IllegalStateException(badRegister(s));
   430             else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
   431                 break;
   432             else if (counts != EMPTY) {             // not 1st registration
   433                 if (parent == null || reconcileState() == s) {
   434                     if (unarrived == 0)             // wait out advance
   435                         root.internalAwaitAdvance(phase, null);
   436                     else if (compareAndSwapLong(                                                       s, s + adj))
   437                         break;
   438                 }
   439             }
   440             else if (parent == null) {              // 1st root registration
   441                 long next = ((long)phase << PHASE_SHIFT) | adj;
   442                 if (compareAndSwapLong(s, next))
   443                     break;
   444             }
   445             else {
   446                 synchronized (this) {               // 1st sub registration
   447                     if (state == s) {               // recheck under lock
   448                         parent.doRegister(1);
   449                         do {                        // force current phase
   450                             phase = (int)(root.state >>> PHASE_SHIFT);
   451                             // assert phase < 0 || (int)state == EMPTY;
   452                         } while (!compareAndSwapLong
   453                                  (state,
   454                                   ((long)phase << PHASE_SHIFT) | adj));
   455                         break;
   456                     }
   457                 }
   458             }
   459         }
   460         return phase;
   461     }
   462 
   463     /**
   464      * Resolves lagged phase propagation from root if necessary.
   465      * Reconciliation normally occurs when root has advanced but
   466      * subphasers have not yet done so, in which case they must finish
   467      * their own advance by setting unarrived to parties (or if
   468      * parties is zero, resetting to unregistered EMPTY state).
   469      * However, this method may also be called when "floating"
   470      * subphasers with possibly some unarrived parties are merely
   471      * catching up to current phase, in which case counts are
   472      * unaffected.
   473      *
   474      * @return reconciled state
   475      */
   476     private long reconcileState() {
   477         final Phaser root = this.root;
   478         long s = state;
   479         if (root != this) {
   480             int phase, u, p;
   481             // CAS root phase with current parties; possibly trip unarrived
   482             while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
   483                    (int)(s >>> PHASE_SHIFT) &&
   484                    !compareAndSwapLong
   485                    (s,
   486                     s = (((long)phase << PHASE_SHIFT) |
   487                          (s & PARTIES_MASK) |
   488                          ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
   489                           (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
   490                 s = state;
   491         }
   492         return s;
   493     }
   494 
   495     /**
   496      * Creates a new phaser with no initially registered parties, no
   497      * parent, and initial phase number 0. Any thread using this
   498      * phaser will need to first register for it.
   499      */
   500     public Phaser() {
   501         this(null, 0);
   502     }
   503 
   504     /**
   505      * Creates a new phaser with the given number of registered
   506      * unarrived parties, no parent, and initial phase number 0.
   507      *
   508      * @param parties the number of parties required to advance to the
   509      * next phase
   510      * @throws IllegalArgumentException if parties less than zero
   511      * or greater than the maximum number of parties supported
   512      */
   513     public Phaser(int parties) {
   514         this(null, parties);
   515     }
   516 
   517     /**
   518      * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
   519      *
   520      * @param parent the parent phaser
   521      */
   522     public Phaser(Phaser parent) {
   523         this(parent, 0);
   524     }
   525 
   526     /**
   527      * Creates a new phaser with the given parent and number of
   528      * registered unarrived parties.  When the given parent is non-null
   529      * and the given number of parties is greater than zero, this
   530      * child phaser is registered with its parent.
   531      *
   532      * @param parent the parent phaser
   533      * @param parties the number of parties required to advance to the
   534      * next phase
   535      * @throws IllegalArgumentException if parties less than zero
   536      * or greater than the maximum number of parties supported
   537      */
   538     public Phaser(Phaser parent, int parties) {
   539         if (parties >>> PARTIES_SHIFT != 0)
   540             throw new IllegalArgumentException("Illegal number of parties");
   541         int phase = 0;
   542         this.parent = parent;
   543         if (parent != null) {
   544             final Phaser root = parent.root;
   545             this.root = root;
   546             this.evenQ = root.evenQ;
   547             this.oddQ = root.oddQ;
   548             if (parties != 0)
   549                 phase = parent.doRegister(1);
   550         }
   551         else {
   552             this.root = this;
   553             this.evenQ = new AtomicReference<QNode>();
   554             this.oddQ = new AtomicReference<QNode>();
   555         }
   556         this.state = (parties == 0) ? (long)EMPTY :
   557             ((long)phase << PHASE_SHIFT) |
   558             ((long)parties << PARTIES_SHIFT) |
   559             ((long)parties);
   560     }
   561 
   562     /**
   563      * Adds a new unarrived party to this phaser.  If an ongoing
   564      * invocation of {@link #onAdvance} is in progress, this method
   565      * may await its completion before returning.  If this phaser has
   566      * a parent, and this phaser previously had no registered parties,
   567      * this child phaser is also registered with its parent. If
   568      * this phaser is terminated, the attempt to register has
   569      * no effect, and a negative value is returned.
   570      *
   571      * @return the arrival phase number to which this registration
   572      * applied.  If this value is negative, then this phaser has
   573      * terminated, in which case registration has no effect.
   574      * @throws IllegalStateException if attempting to register more
   575      * than the maximum supported number of parties
   576      */
   577     public int register() {
   578         return doRegister(1);
   579     }
   580 
   581     /**
   582      * Adds the given number of new unarrived parties to this phaser.
   583      * If an ongoing invocation of {@link #onAdvance} is in progress,
   584      * this method may await its completion before returning.  If this
   585      * phaser has a parent, and the given number of parties is greater
   586      * than zero, and this phaser previously had no registered
   587      * parties, this child phaser is also registered with its parent.
   588      * If this phaser is terminated, the attempt to register has no
   589      * effect, and a negative value is returned.
   590      *
   591      * @param parties the number of additional parties required to
   592      * advance to the next phase
   593      * @return the arrival phase number to which this registration
   594      * applied.  If this value is negative, then this phaser has
   595      * terminated, in which case registration has no effect.
   596      * @throws IllegalStateException if attempting to register more
   597      * than the maximum supported number of parties
   598      * @throws IllegalArgumentException if {@code parties < 0}
   599      */
   600     public int bulkRegister(int parties) {
   601         if (parties < 0)
   602             throw new IllegalArgumentException();
   603         if (parties == 0)
   604             return getPhase();
   605         return doRegister(parties);
   606     }
   607 
   608     /**
   609      * Arrives at this phaser, without waiting for others to arrive.
   610      *
   611      * <p>It is a usage error for an unregistered party to invoke this
   612      * method.  However, this error may result in an {@code
   613      * IllegalStateException} only upon some subsequent operation on
   614      * this phaser, if ever.
   615      *
   616      * @return the arrival phase number, or a negative value if terminated
   617      * @throws IllegalStateException if not terminated and the number
   618      * of unarrived parties would become negative
   619      */
   620     public int arrive() {
   621         return doArrive(false);
   622     }
   623 
   624     /**
   625      * Arrives at this phaser and deregisters from it without waiting
   626      * for others to arrive. Deregistration reduces the number of
   627      * parties required to advance in future phases.  If this phaser
   628      * has a parent, and deregistration causes this phaser to have
   629      * zero parties, this phaser is also deregistered from its parent.
   630      *
   631      * <p>It is a usage error for an unregistered party to invoke this
   632      * method.  However, this error may result in an {@code
   633      * IllegalStateException} only upon some subsequent operation on
   634      * this phaser, if ever.
   635      *
   636      * @return the arrival phase number, or a negative value if terminated
   637      * @throws IllegalStateException if not terminated and the number
   638      * of registered or unarrived parties would become negative
   639      */
   640     public int arriveAndDeregister() {
   641         return doArrive(true);
   642     }
   643 
   644     /**
   645      * Arrives at this phaser and awaits others. Equivalent in effect
   646      * to {@code awaitAdvance(arrive())}.  If you need to await with
   647      * interruption or timeout, you can arrange this with an analogous
   648      * construction using one of the other forms of the {@code
   649      * awaitAdvance} method.  If instead you need to deregister upon
   650      * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
   651      *
   652      * <p>It is a usage error for an unregistered party to invoke this
   653      * method.  However, this error may result in an {@code
   654      * IllegalStateException} only upon some subsequent operation on
   655      * this phaser, if ever.
   656      *
   657      * @return the arrival phase number, or the (negative)
   658      * {@linkplain #getPhase() current phase} if terminated
   659      * @throws IllegalStateException if not terminated and the number
   660      * of unarrived parties would become negative
   661      */
   662     public int arriveAndAwaitAdvance() {
   663         // Specialization of doArrive+awaitAdvance eliminating some reads/paths
   664         final Phaser root = this.root;
   665         for (;;) {
   666             long s = (root == this) ? state : reconcileState();
   667             int phase = (int)(s >>> PHASE_SHIFT);
   668             int counts = (int)s;
   669             int unarrived = (counts & UNARRIVED_MASK) - 1;
   670             if (phase < 0)
   671                 return phase;
   672             else if (counts == EMPTY || unarrived < 0) {
   673                 if (reconcileState() == s)
   674                     throw new IllegalStateException(badArrive(s));
   675             }
   676             else if (compareAndSwapLong(s,
   677                                                s -= ONE_ARRIVAL)) {
   678                 if (unarrived != 0)
   679                     return root.internalAwaitAdvance(phase, null);
   680                 if (root != this)
   681                     return parent.arriveAndAwaitAdvance();
   682                 long n = s & PARTIES_MASK;  // base of next state
   683                 int nextUnarrived = (int)n >>> PARTIES_SHIFT;
   684                 if (onAdvance(phase, nextUnarrived))
   685                     n |= TERMINATION_BIT;
   686                 else if (nextUnarrived == 0)
   687                     n |= EMPTY;
   688                 else
   689                     n |= nextUnarrived;
   690                 int nextPhase = (phase + 1) & MAX_PHASE;
   691                 n |= (long)nextPhase << PHASE_SHIFT;
   692                 if (!compareAndSwapLong(s, n))
   693                     return (int)(state >>> PHASE_SHIFT); // terminated
   694                 releaseWaiters(phase);
   695                 return nextPhase;
   696             }
   697         }
   698     }
   699 
   700     /**
   701      * Awaits the phase of this phaser to advance from the given phase
   702      * value, returning immediately if the current phase is not equal
   703      * to the given phase value or this phaser is terminated.
   704      *
   705      * @param phase an arrival phase number, or negative value if
   706      * terminated; this argument is normally the value returned by a
   707      * previous call to {@code arrive} or {@code arriveAndDeregister}.
   708      * @return the next arrival phase number, or the argument if it is
   709      * negative, or the (negative) {@linkplain #getPhase() current phase}
   710      * if terminated
   711      */
   712     public int awaitAdvance(int phase) {
   713         final Phaser root = this.root;
   714         long s = (root == this) ? state : reconcileState();
   715         int p = (int)(s >>> PHASE_SHIFT);
   716         if (phase < 0)
   717             return phase;
   718         if (p == phase)
   719             return root.internalAwaitAdvance(phase, null);
   720         return p;
   721     }
   722 
   723     /**
   724      * Awaits the phase of this phaser to advance from the given phase
   725      * value, throwing {@code InterruptedException} if interrupted
   726      * while waiting, or returning immediately if the current phase is
   727      * not equal to the given phase value or this phaser is
   728      * terminated.
   729      *
   730      * @param phase an arrival phase number, or negative value if
   731      * terminated; this argument is normally the value returned by a
   732      * previous call to {@code arrive} or {@code arriveAndDeregister}.
   733      * @return the next arrival phase number, or the argument if it is
   734      * negative, or the (negative) {@linkplain #getPhase() current phase}
   735      * if terminated
   736      * @throws InterruptedException if thread interrupted while waiting
   737      */
   738     public int awaitAdvanceInterruptibly(int phase)
   739         throws InterruptedException {
   740         final Phaser root = this.root;
   741         long s = (root == this) ? state : reconcileState();
   742         int p = (int)(s >>> PHASE_SHIFT);
   743         if (phase < 0)
   744             return phase;
   745         if (p == phase) {
   746             QNode node = new QNode(this, phase, true, false, 0L);
   747             p = root.internalAwaitAdvance(phase, node);
   748             if (node.wasInterrupted)
   749                 throw new InterruptedException();
   750         }
   751         return p;
   752     }
   753 
   754     /**
   755      * Awaits the phase of this phaser to advance from the given phase
   756      * value or the given timeout to elapse, throwing {@code
   757      * InterruptedException} if interrupted while waiting, or
   758      * returning immediately if the current phase is not equal to the
   759      * given phase value or this phaser is terminated.
   760      *
   761      * @param phase an arrival phase number, or negative value if
   762      * terminated; this argument is normally the value returned by a
   763      * previous call to {@code arrive} or {@code arriveAndDeregister}.
   764      * @param timeout how long to wait before giving up, in units of
   765      *        {@code unit}
   766      * @param unit a {@code TimeUnit} determining how to interpret the
   767      *        {@code timeout} parameter
   768      * @return the next arrival phase number, or the argument if it is
   769      * negative, or the (negative) {@linkplain #getPhase() current phase}
   770      * if terminated
   771      * @throws InterruptedException if thread interrupted while waiting
   772      * @throws TimeoutException if timed out while waiting
   773      */
   774     public int awaitAdvanceInterruptibly(int phase,
   775                                          long timeout, TimeUnit unit)
   776         throws InterruptedException, TimeoutException {
   777         long nanos = unit.toNanos(timeout);
   778         final Phaser root = this.root;
   779         long s = (root == this) ? state : reconcileState();
   780         int p = (int)(s >>> PHASE_SHIFT);
   781         if (phase < 0)
   782             return phase;
   783         if (p == phase) {
   784             QNode node = new QNode(this, phase, true, true, nanos);
   785             p = root.internalAwaitAdvance(phase, node);
   786             if (node.wasInterrupted)
   787                 throw new InterruptedException();
   788             else if (p == phase)
   789                 throw new TimeoutException();
   790         }
   791         return p;
   792     }
   793 
   794     /**
   795      * Forces this phaser to enter termination state.  Counts of
   796      * registered parties are unaffected.  If this phaser is a member
   797      * of a tiered set of phasers, then all of the phasers in the set
   798      * are terminated.  If this phaser is already terminated, this
   799      * method has no effect.  This method may be useful for
   800      * coordinating recovery after one or more tasks encounter
   801      * unexpected exceptions.
   802      */
   803     public void forceTermination() {
   804         // Only need to change root state
   805         final Phaser root = this.root;
   806         long s;
   807         while ((s = root.state) >= 0) {
   808             if (compareAndSwapLong(                                          s, s | TERMINATION_BIT)) {
   809                 // signal all threads
   810                 releaseWaiters(0);
   811                 releaseWaiters(1);
   812                 return;
   813             }
   814         }
   815     }
   816 
   817     /**
   818      * Returns the current phase number. The maximum phase number is
   819      * {@code Integer.MAX_VALUE}, after which it restarts at
   820      * zero. Upon termination, the phase number is negative,
   821      * in which case the prevailing phase prior to termination
   822      * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
   823      *
   824      * @return the phase number, or a negative value if terminated
   825      */
   826     public final int getPhase() {
   827         return (int)(root.state >>> PHASE_SHIFT);
   828     }
   829 
   830     /**
   831      * Returns the number of parties registered at this phaser.
   832      *
   833      * @return the number of parties
   834      */
   835     public int getRegisteredParties() {
   836         return partiesOf(state);
   837     }
   838 
   839     /**
   840      * Returns the number of registered parties that have arrived at
   841      * the current phase of this phaser. If this phaser has terminated,
   842      * the returned value is meaningless and arbitrary.
   843      *
   844      * @return the number of arrived parties
   845      */
   846     public int getArrivedParties() {
   847         return arrivedOf(reconcileState());
   848     }
   849 
   850     /**
   851      * Returns the number of registered parties that have not yet
   852      * arrived at the current phase of this phaser. If this phaser has
   853      * terminated, the returned value is meaningless and arbitrary.
   854      *
   855      * @return the number of unarrived parties
   856      */
   857     public int getUnarrivedParties() {
   858         return unarrivedOf(reconcileState());
   859     }
   860 
   861     /**
   862      * Returns the parent of this phaser, or {@code null} if none.
   863      *
   864      * @return the parent of this phaser, or {@code null} if none
   865      */
   866     public Phaser getParent() {
   867         return parent;
   868     }
   869 
   870     /**
   871      * Returns the root ancestor of this phaser, which is the same as
   872      * this phaser if it has no parent.
   873      *
   874      * @return the root ancestor of this phaser
   875      */
   876     public Phaser getRoot() {
   877         return root;
   878     }
   879 
   880     /**
   881      * Returns {@code true} if this phaser has been terminated.
   882      *
   883      * @return {@code true} if this phaser has been terminated
   884      */
   885     public boolean isTerminated() {
   886         return root.state < 0L;
   887     }
   888 
   889     /**
   890      * Overridable method to perform an action upon impending phase
   891      * advance, and to control termination. This method is invoked
   892      * upon arrival of the party advancing this phaser (when all other
   893      * waiting parties are dormant).  If this method returns {@code
   894      * true}, this phaser will be set to a final termination state
   895      * upon advance, and subsequent calls to {@link #isTerminated}
   896      * will return true. Any (unchecked) Exception or Error thrown by
   897      * an invocation of this method is propagated to the party
   898      * attempting to advance this phaser, in which case no advance
   899      * occurs.
   900      *
   901      * <p>The arguments to this method provide the state of the phaser
   902      * prevailing for the current transition.  The effects of invoking
   903      * arrival, registration, and waiting methods on this phaser from
   904      * within {@code onAdvance} are unspecified and should not be
   905      * relied on.
   906      *
   907      * <p>If this phaser is a member of a tiered set of phasers, then
   908      * {@code onAdvance} is invoked only for its root phaser on each
   909      * advance.
   910      *
   911      * <p>To support the most common use cases, the default
   912      * implementation of this method returns {@code true} when the
   913      * number of registered parties has become zero as the result of a
   914      * party invoking {@code arriveAndDeregister}.  You can disable
   915      * this behavior, thus enabling continuation upon future
   916      * registrations, by overriding this method to always return
   917      * {@code false}:
   918      *
   919      * <pre> {@code
   920      * Phaser phaser = new Phaser() {
   921      *   protected boolean onAdvance(int phase, int parties) { return false; }
   922      * }}</pre>
   923      *
   924      * @param phase the current phase number on entry to this method,
   925      * before this phaser is advanced
   926      * @param registeredParties the current number of registered parties
   927      * @return {@code true} if this phaser should terminate
   928      */
   929     protected boolean onAdvance(int phase, int registeredParties) {
   930         return registeredParties == 0;
   931     }
   932 
   933     /**
   934      * Returns a string identifying this phaser, as well as its
   935      * state.  The state, in brackets, includes the String {@code
   936      * "phase = "} followed by the phase number, {@code "parties = "}
   937      * followed by the number of registered parties, and {@code
   938      * "arrived = "} followed by the number of arrived parties.
   939      *
   940      * @return a string identifying this phaser, as well as its state
   941      */
   942     public String toString() {
   943         return stateToString(reconcileState());
   944     }
   945 
   946     /**
   947      * Implementation of toString and string-based error messages
   948      */
   949     private String stateToString(long s) {
   950         return super.toString() +
   951             "[phase = " + phaseOf(s) +
   952             " parties = " + partiesOf(s) +
   953             " arrived = " + arrivedOf(s) + "]";
   954     }
   955 
   956     // Waiting mechanics
   957 
   958     /**
   959      * Removes and signals threads from queue for phase.
   960      */
   961     private void releaseWaiters(int phase) {
   962         QNode q;   // first element of queue
   963         Thread t;  // its thread
   964         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
   965         while ((q = head.get()) != null &&
   966                q.phase != (int)(root.state >>> PHASE_SHIFT)) {
   967             if (head.compareAndSet(q, q.next) &&
   968                 (t = q.thread) != null) {
   969                 q.thread = null;
   970                 LockSupport.unpark(t);
   971             }
   972         }
   973     }
   974 
   975     /**
   976      * Variant of releaseWaiters that additionally tries to remove any
   977      * nodes no longer waiting for advance due to timeout or
   978      * interrupt. Currently, nodes are removed only if they are at
   979      * head of queue, which suffices to reduce memory footprint in
   980      * most usages.
   981      *
   982      * @return current phase on exit
   983      */
   984     private int abortWait(int phase) {
   985         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
   986         for (;;) {
   987             Thread t;
   988             QNode q = head.get();
   989             int p = (int)(root.state >>> PHASE_SHIFT);
   990             if (q == null || ((t = q.thread) != null && q.phase == p))
   991                 return p;
   992             if (head.compareAndSet(q, q.next) && t != null) {
   993                 q.thread = null;
   994                 LockSupport.unpark(t);
   995             }
   996         }
   997     }
   998 
   999     /** The number of CPUs, for spin control */
  1000     private static final int NCPU = 1;
  1001 
  1002     /**
  1003      * The number of times to spin before blocking while waiting for
  1004      * advance, per arrival while waiting. On multiprocessors, fully
  1005      * blocking and waking up a large number of threads all at once is
  1006      * usually a very slow process, so we use rechargeable spins to
  1007      * avoid it when threads regularly arrive: When a thread in
  1008      * internalAwaitAdvance notices another arrival before blocking,
  1009      * and there appear to be enough CPUs available, it spins
  1010      * SPINS_PER_ARRIVAL more times before blocking. The value trades
  1011      * off good-citizenship vs big unnecessary slowdowns.
  1012      */
  1013     static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
  1014 
  1015     /**
  1016      * Possibly blocks and waits for phase to advance unless aborted.
  1017      * Call only from root node.
  1018      *
  1019      * @param phase current phase
  1020      * @param node if non-null, the wait node to track interrupt and timeout;
  1021      * if null, denotes noninterruptible wait
  1022      * @return current phase
  1023      */
  1024     private int internalAwaitAdvance(int phase, QNode node) {
  1025         releaseWaiters(phase-1);          // ensure old queue clean
  1026         boolean queued = false;           // true when node is enqueued
  1027         int lastUnarrived = 0;            // to increase spins upon change
  1028         int spins = SPINS_PER_ARRIVAL;
  1029         long s;
  1030         int p;
  1031         while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
  1032             if (node == null) {           // spinning in noninterruptible mode
  1033                 int unarrived = (int)s & UNARRIVED_MASK;
  1034                 if (unarrived != lastUnarrived &&
  1035                     (lastUnarrived = unarrived) < NCPU)
  1036                     spins += SPINS_PER_ARRIVAL;
  1037                 boolean interrupted = Thread.interrupted();
  1038                 if (interrupted || --spins < 0) { // need node to record intr
  1039                     node = new QNode(this, phase, false, false, 0L);
  1040                     node.wasInterrupted = interrupted;
  1041                 }
  1042             }
  1043             else if (node.isReleasable()) // done or aborted
  1044                 break;
  1045             else if (!queued) {           // push onto queue
  1046                 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
  1047                 QNode q = node.next = head.get();
  1048                 if ((q == null || q.phase == phase) &&
  1049                     (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
  1050                     queued = head.compareAndSet(q, node);
  1051             }
  1052             else {
  1053                 try {
  1054                     ForkJoinPool.managedBlock(node);
  1055                 } catch (InterruptedException ie) {
  1056                     node.wasInterrupted = true;
  1057                 }
  1058             }
  1059         }
  1060 
  1061         if (node != null) {
  1062             if (node.thread != null)
  1063                 node.thread = null;       // avoid need for unpark()
  1064             if (node.wasInterrupted && !node.interruptible)
  1065                 Thread.currentThread().interrupt();
  1066             if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
  1067                 return abortWait(phase); // possibly clean up on abort
  1068         }
  1069         releaseWaiters(phase);
  1070         return p;
  1071     }
  1072 
  1073     private boolean compareAndSwapLong(long s, long l) {
  1074         if (this.state == s) {
  1075             this.state = l;
  1076             return true;
  1077         }
  1078         return false;
  1079     }
  1080 
  1081     /**
  1082      * Wait nodes for Treiber stack representing wait queue
  1083      */
  1084     static final class QNode implements ForkJoinPool.ManagedBlocker {
  1085         final Phaser phaser;
  1086         final int phase;
  1087         final boolean interruptible;
  1088         final boolean timed;
  1089         boolean wasInterrupted;
  1090         long nanos;
  1091         long lastTime;
  1092         volatile Thread thread; // nulled to cancel wait
  1093         QNode next;
  1094 
  1095         QNode(Phaser phaser, int phase, boolean interruptible,
  1096               boolean timed, long nanos) {
  1097             this.phaser = phaser;
  1098             this.phase = phase;
  1099             this.interruptible = interruptible;
  1100             this.nanos = nanos;
  1101             this.timed = timed;
  1102             this.lastTime = timed ? System.nanoTime() : 0L;
  1103             thread = Thread.currentThread();
  1104         }
  1105 
  1106         public boolean isReleasable() {
  1107             if (thread == null)
  1108                 return true;
  1109             if (phaser.getPhase() != phase) {
  1110                 thread = null;
  1111                 return true;
  1112             }
  1113             if (Thread.interrupted())
  1114                 wasInterrupted = true;
  1115             if (wasInterrupted && interruptible) {
  1116                 thread = null;
  1117                 return true;
  1118             }
  1119             if (timed) {
  1120                 if (nanos > 0L) {
  1121                     long now = System.nanoTime();
  1122                     nanos -= now - lastTime;
  1123                     lastTime = now;
  1124                 }
  1125                 if (nanos <= 0L) {
  1126                     thread = null;
  1127                     return true;
  1128                 }
  1129             }
  1130             return false;
  1131         }
  1132 
  1133         public boolean block() {
  1134             if (isReleasable())
  1135                 return true;
  1136             else if (!timed)
  1137                 LockSupport.park(this);
  1138             else if (nanos > 0)
  1139                 LockSupport.parkNanos(this, nanos);
  1140             return isReleasable();
  1141         }
  1142     }
  1143 }