rt/emul/compact/src/main/java/java/util/concurrent/Phaser.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
child 1895 bfaf3300b7ba
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.concurrent.TimeUnit;
    39 import java.util.concurrent.TimeoutException;
    40 import java.util.concurrent.atomic.AtomicReference;
    41 import java.util.concurrent.locks.LockSupport;
    42 
    43 /**
    44  * A reusable synchronization barrier, similar in functionality to
    45  * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
    46  * {@link java.util.concurrent.CountDownLatch CountDownLatch}
    47  * but supporting more flexible usage.
    48  *
    49  * <p> <b>Registration.</b> Unlike the case for other barriers, the
    50  * number of parties <em>registered</em> to synchronize on a phaser
    51  * may vary over time.  Tasks may be registered at any time (using
    52  * methods {@link #register}, {@link #bulkRegister}, or forms of
    53  * constructors establishing initial numbers of parties), and
    54  * optionally deregistered upon any arrival (using {@link
    55  * #arriveAndDeregister}).  As is the case with most basic
    56  * synchronization constructs, registration and deregistration affect
    57  * only internal counts; they do not establish any further internal
    58  * bookkeeping, so tasks cannot query whether they are registered.
    59  * (However, you can introduce such bookkeeping by subclassing this
    60  * class.)
    61  *
    62  * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
    63  * Phaser} may be repeatedly awaited.  Method {@link
    64  * #arriveAndAwaitAdvance} has effect analogous to {@link
    65  * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
    66  * generation of a phaser has an associated phase number. The phase
    67  * number starts at zero, and advances when all parties arrive at the
    68  * phaser, wrapping around to zero after reaching {@code
    69  * Integer.MAX_VALUE}. The use of phase numbers enables independent
    70  * control of actions upon arrival at a phaser and upon awaiting
    71  * others, via two kinds of methods that may be invoked by any
    72  * registered party:
    73  *
    74  * <ul>
    75  *
    76  *   <li> <b>Arrival.</b> Methods {@link #arrive} and
    77  *       {@link #arriveAndDeregister} record arrival.  These methods
    78  *       do not block, but return an associated <em>arrival phase
    79  *       number</em>; that is, the phase number of the phaser to which
    80  *       the arrival applied. When the final party for a given phase
    81  *       arrives, an optional action is performed and the phase
    82  *       advances.  These actions are performed by the party
    83  *       triggering a phase advance, and are arranged by overriding
    84  *       method {@link #onAdvance(int, int)}, which also controls
    85  *       termination. Overriding this method is similar to, but more
    86  *       flexible than, providing a barrier action to a {@code
    87  *       CyclicBarrier}.
    88  *
    89  *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
    90  *       argument indicating an arrival phase number, and returns when
    91  *       the phaser advances to (or is already at) a different phase.
    92  *       Unlike similar constructions using {@code CyclicBarrier},
    93  *       method {@code awaitAdvance} continues to wait even if the
    94  *       waiting thread is interrupted. Interruptible and timeout
    95  *       versions are also available, but exceptions encountered while
    96  *       tasks wait interruptibly or with timeout do not change the
    97  *       state of the phaser. If necessary, you can perform any
    98  *       associated recovery within handlers of those exceptions,
    99  *       often after invoking {@code forceTermination}.  Phasers may
   100  *       also be used by tasks executing in a {@link ForkJoinPool},
   101  *       which will ensure sufficient parallelism to execute tasks
   102  *       when others are blocked waiting for a phase to advance.
   103  *
   104  * </ul>
   105  *
   106  * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
   107  * state, that may be checked using method {@link #isTerminated}. Upon
   108  * termination, all synchronization methods immediately return without
   109  * waiting for advance, as indicated by a negative return value.
   110  * Similarly, attempts to register upon termination have no effect.
   111  * Termination is triggered when an invocation of {@code onAdvance}
   112  * returns {@code true}. The default implementation returns {@code
   113  * true} if a deregistration has caused the number of registered
   114  * parties to become zero.  As illustrated below, when phasers control
   115  * actions with a fixed number of iterations, it is often convenient
   116  * to override this method to cause termination when the current phase
   117  * number reaches a threshold. Method {@link #forceTermination} is
   118  * also available to abruptly release waiting threads and allow them
   119  * to terminate.
   120  *
   121  * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
   122  * constructed in tree structures) to reduce contention. Phasers with
   123  * large numbers of parties that would otherwise experience heavy
   124  * synchronization contention costs may instead be set up so that
   125  * groups of sub-phasers share a common parent.  This may greatly
   126  * increase throughput even though it incurs greater per-operation
   127  * overhead.
   128  *
   129  * <p>In a tree of tiered phasers, registration and deregistration of
   130  * child phasers with their parent are managed automatically.
   131  * Whenever the number of registered parties of a child phaser becomes
   132  * non-zero (as established in the {@link #Phaser(Phaser,int)}
   133  * constructor, {@link #register}, or {@link #bulkRegister}), the
   134  * child phaser is registered with its parent.  Whenever the number of
   135  * registered parties becomes zero as the result of an invocation of
   136  * {@link #arriveAndDeregister}, the child phaser is deregistered
   137  * from its parent.
   138  *
   139  * <p><b>Monitoring.</b> While synchronization methods may be invoked
   140  * only by registered parties, the current state of a phaser may be
   141  * monitored by any caller.  At any given moment there are {@link
   142  * #getRegisteredParties} parties in total, of which {@link
   143  * #getArrivedParties} have arrived at the current phase ({@link
   144  * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
   145  * parties arrive, the phase advances.  The values returned by these
   146  * methods may reflect transient states and so are not in general
   147  * useful for synchronization control.  Method {@link #toString}
   148  * returns snapshots of these state queries in a form convenient for
   149  * informal monitoring.
   150  *
   151  * <p><b>Sample usages:</b>
   152  *
   153  * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
   154  * to control a one-shot action serving a variable number of parties.
   155  * The typical idiom is for the method setting this up to first
   156  * register, then start the actions, then deregister, as in:
   157  *
   158  *  <pre> {@code
   159  * void runTasks(List<Runnable> tasks) {
   160  *   final Phaser phaser = new Phaser(1); // "1" to register self
   161  *   // create and start threads
   162  *   for (final Runnable task : tasks) {
   163  *     phaser.register();
   164  *     new Thread() {
   165  *       public void run() {
   166  *         phaser.arriveAndAwaitAdvance(); // await all creation
   167  *         task.run();
   168  *       }
   169  *     }.start();
   170  *   }
   171  *
   172  *   // allow threads to start and deregister self
   173  *   phaser.arriveAndDeregister();
   174  * }}</pre>
   175  *
   176  * <p>One way to cause a set of threads to repeatedly perform actions
   177  * for a given number of iterations is to override {@code onAdvance}:
   178  *
   179  *  <pre> {@code
   180  * void startTasks(List<Runnable> tasks, final int iterations) {
   181  *   final Phaser phaser = new Phaser() {
   182  *     protected boolean onAdvance(int phase, int registeredParties) {
   183  *       return phase >= iterations || registeredParties == 0;
   184  *     }
   185  *   };
   186  *   phaser.register();
   187  *   for (final Runnable task : tasks) {
   188  *     phaser.register();
   189  *     new Thread() {
   190  *       public void run() {
   191  *         do {
   192  *           task.run();
   193  *           phaser.arriveAndAwaitAdvance();
   194  *         } while (!phaser.isTerminated());
   195  *       }
   196  *     }.start();
   197  *   }
   198  *   phaser.arriveAndDeregister(); // deregister self, don't wait
   199  * }}</pre>
   200  *
   201  * If the main task must later await termination, it
   202  * may re-register and then execute a similar loop:
   203  *  <pre> {@code
   204  *   // ...
   205  *   phaser.register();
   206  *   while (!phaser.isTerminated())
   207  *     phaser.arriveAndAwaitAdvance();}</pre>
   208  *
   209  * <p>Related constructions may be used to await particular phase numbers
   210  * in contexts where you are sure that the phase will never wrap around
   211  * {@code Integer.MAX_VALUE}. For example:
   212  *
   213  *  <pre> {@code
   214  * void awaitPhase(Phaser phaser, int phase) {
   215  *   int p = phaser.register(); // assumes caller not already registered
   216  *   while (p < phase) {
   217  *     if (phaser.isTerminated())
   218  *       // ... deal with unexpected termination
   219  *     else
   220  *       p = phaser.arriveAndAwaitAdvance();
   221  *   }
   222  *   phaser.arriveAndDeregister();
   223  * }}</pre>
   224  *
   225  *
   226  * <p>To create a set of {@code n} tasks using a tree of phasers, you
   227  * could use code of the following form, assuming a Task class with a
   228  * constructor accepting a {@code Phaser} that it registers with upon
   229  * construction. After invocation of {@code build(new Task[n], 0, n,
   230  * new Phaser())}, these tasks could then be started, for example by
   231  * submitting to a pool:
   232  *
   233  *  <pre> {@code
   234  * void build(Task[] tasks, int lo, int hi, Phaser ph) {
   235  *   if (hi - lo > TASKS_PER_PHASER) {
   236  *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
   237  *       int j = Math.min(i + TASKS_PER_PHASER, hi);
   238  *       build(tasks, i, j, new Phaser(ph));
   239  *     }
   240  *   } else {
   241  *     for (int i = lo; i < hi; ++i)
   242  *       tasks[i] = new Task(ph);
   243  *       // assumes new Task(ph) performs ph.register()
   244  *   }
   245  * }}</pre>
   246  *
   247  * The best value of {@code TASKS_PER_PHASER} depends mainly on
   248  * expected synchronization rates. A value as low as four may
   249  * be appropriate for extremely small per-phase task bodies (thus
   250  * high rates), or up to hundreds for extremely large ones.
   251  *
   252  * <p><b>Implementation notes</b>: This implementation restricts the
   253  * maximum number of parties to 65535. Attempts to register additional
   254  * parties result in {@code IllegalStateException}. However, you can and
   255  * should create tiered phasers to accommodate arbitrarily large sets
   256  * of participants.
   257  *
   258  * @since 1.7
   259  * @author Doug Lea
   260  */
   261 public class Phaser {
   262     /*
   263      * This class implements an extension of X10 "clocks".  Thanks to
   264      * Vijay Saraswat for the idea, and to Vivek Sarkar for
   265      * enhancements to extend functionality.
   266      */
   267 
   268     /**
   269      * Primary state representation, holding four bit-fields:
   270      *
   271      * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
   272      * parties    -- the number of parties to wait            (bits 16-31)
   273      * phase      -- the generation of the barrier            (bits 32-62)
   274      * terminated -- set if barrier is terminated             (bit  63 / sign)
   275      *
   276      * Except that a phaser with no registered parties is
   277      * distinguished by the otherwise illegal state of having zero
   278      * parties and one unarrived parties (encoded as EMPTY below).
   279      *
   280      * To efficiently maintain atomicity, these values are packed into
   281      * a single (atomic) long. Good performance relies on keeping
   282      * state decoding and encoding simple, and keeping race windows
   283      * short.
   284      *
   285      * All state updates are performed via CAS except initial
   286      * registration of a sub-phaser (i.e., one with a non-null
   287      * parent).  In this (relatively rare) case, we use built-in
   288      * synchronization to lock while first registering with its
   289      * parent.
   290      *
   291      * The phase of a subphaser is allowed to lag that of its
   292      * ancestors until it is actually accessed -- see method
   293      * reconcileState.
   294      */
   295     private volatile long state;
   296 
   297     private static final int  MAX_PARTIES     = 0xffff;
   298     private static final int  MAX_PHASE       = Integer.MAX_VALUE;
   299     private static final int  PARTIES_SHIFT   = 16;
   300     private static final int  PHASE_SHIFT     = 32;
   301     private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
   302     private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
   303     private static final long TERMINATION_BIT = 1L << 63;
   304 
   305     // some special values
   306     private static final int  ONE_ARRIVAL     = 1;
   307     private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
   308     private static final int  EMPTY           = 1;
   309 
   310     // The following unpacking methods are usually manually inlined
   311 
   312     private static int unarrivedOf(long s) {
   313         int counts = (int)s;
   314         return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
   315     }
   316 
   317     private static int partiesOf(long s) {
   318         return (int)s >>> PARTIES_SHIFT;
   319     }
   320 
   321     private static int phaseOf(long s) {
   322         return (int)(s >>> PHASE_SHIFT);
   323     }
   324 
   325     private static int arrivedOf(long s) {
   326         int counts = (int)s;
   327         return (counts == EMPTY) ? 0 :
   328             (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
   329     }
   330 
   331     /**
   332      * The parent of this phaser, or null if none
   333      */
   334     private final Phaser parent;
   335 
   336     /**
   337      * The root of phaser tree. Equals this if not in a tree.
   338      */
   339     private final Phaser root;
   340 
   341     /**
   342      * Heads of Treiber stacks for waiting threads. To eliminate
   343      * contention when releasing some threads while adding others, we
   344      * use two of them, alternating across even and odd phases.
   345      * Subphasers share queues with root to speed up releases.
   346      */
   347     private final AtomicReference<QNode> evenQ;
   348     private final AtomicReference<QNode> oddQ;
   349 
   350     private AtomicReference<QNode> queueFor(int phase) {
   351         return ((phase & 1) == 0) ? evenQ : oddQ;
   352     }
   353 
   354     /**
   355      * Returns message string for bounds exceptions on arrival.
   356      */
   357     private String badArrive(long s) {
   358         return "Attempted arrival of unregistered party for " +
   359             stateToString(s);
   360     }
   361 
   362     /**
   363      * Returns message string for bounds exceptions on registration.
   364      */
   365     private String badRegister(long s) {
   366         return "Attempt to register more than " +
   367             MAX_PARTIES + " parties for " + stateToString(s);
   368     }
   369 
   370     /**
   371      * Main implementation for methods arrive and arriveAndDeregister.
   372      * Manually tuned to speed up and minimize race windows for the
   373      * common case of just decrementing unarrived field.
   374      *
   375      * @param deregister false for arrive, true for arriveAndDeregister
   376      */
   377     private int doArrive(boolean deregister) {
   378         int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
   379         final Phaser root = this.root;
   380         for (;;) {
   381             long s = (root == this) ? state : reconcileState();
   382             int phase = (int)(s >>> PHASE_SHIFT);
   383             int counts = (int)s;
   384             int unarrived = (counts & UNARRIVED_MASK) - 1;
   385             if (phase < 0)
   386                 return phase;
   387             else if (counts == EMPTY || unarrived < 0) {
   388                 if (root == this || reconcileState() == s)
   389                     throw new IllegalStateException(badArrive(s));
   390             }
   391             else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
   392                 if (unarrived == 0) {
   393                     long n = s & PARTIES_MASK;  // base of next state
   394                     int nextUnarrived = (int)n >>> PARTIES_SHIFT;
   395                     if (root != this)
   396                         return parent.doArrive(nextUnarrived == 0);
   397                     if (onAdvance(phase, nextUnarrived))
   398                         n |= TERMINATION_BIT;
   399                     else if (nextUnarrived == 0)
   400                         n |= EMPTY;
   401                     else
   402                         n |= nextUnarrived;
   403                     n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
   404                     UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
   405                     releaseWaiters(phase);
   406                 }
   407                 return phase;
   408             }
   409         }
   410     }
   411 
   412     /**
   413      * Implementation of register, bulkRegister
   414      *
   415      * @param registrations number to add to both parties and
   416      * unarrived fields. Must be greater than zero.
   417      */
   418     private int doRegister(int registrations) {
   419         // adjustment to state
   420         long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
   421         final Phaser parent = this.parent;
   422         int phase;
   423         for (;;) {
   424             long s = state;
   425             int counts = (int)s;
   426             int parties = counts >>> PARTIES_SHIFT;
   427             int unarrived = counts & UNARRIVED_MASK;
   428             if (registrations > MAX_PARTIES - parties)
   429                 throw new IllegalStateException(badRegister(s));
   430             else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
   431                 break;
   432             else if (counts != EMPTY) {             // not 1st registration
   433                 if (parent == null || reconcileState() == s) {
   434                     if (unarrived == 0)             // wait out advance
   435                         root.internalAwaitAdvance(phase, null);
   436                     else if (UNSAFE.compareAndSwapLong(this, stateOffset,
   437                                                        s, s + adj))
   438                         break;
   439                 }
   440             }
   441             else if (parent == null) {              // 1st root registration
   442                 long next = ((long)phase << PHASE_SHIFT) | adj;
   443                 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
   444                     break;
   445             }
   446             else {
   447                 synchronized (this) {               // 1st sub registration
   448                     if (state == s) {               // recheck under lock
   449                         parent.doRegister(1);
   450                         do {                        // force current phase
   451                             phase = (int)(root.state >>> PHASE_SHIFT);
   452                             // assert phase < 0 || (int)state == EMPTY;
   453                         } while (!UNSAFE.compareAndSwapLong
   454                                  (this, stateOffset, state,
   455                                   ((long)phase << PHASE_SHIFT) | adj));
   456                         break;
   457                     }
   458                 }
   459             }
   460         }
   461         return phase;
   462     }
   463 
   464     /**
   465      * Resolves lagged phase propagation from root if necessary.
   466      * Reconciliation normally occurs when root has advanced but
   467      * subphasers have not yet done so, in which case they must finish
   468      * their own advance by setting unarrived to parties (or if
   469      * parties is zero, resetting to unregistered EMPTY state).
   470      * However, this method may also be called when "floating"
   471      * subphasers with possibly some unarrived parties are merely
   472      * catching up to current phase, in which case counts are
   473      * unaffected.
   474      *
   475      * @return reconciled state
   476      */
   477     private long reconcileState() {
   478         final Phaser root = this.root;
   479         long s = state;
   480         if (root != this) {
   481             int phase, u, p;
   482             // CAS root phase with current parties; possibly trip unarrived
   483             while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
   484                    (int)(s >>> PHASE_SHIFT) &&
   485                    !UNSAFE.compareAndSwapLong
   486                    (this, stateOffset, s,
   487                     s = (((long)phase << PHASE_SHIFT) |
   488                          (s & PARTIES_MASK) |
   489                          ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
   490                           (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
   491                 s = state;
   492         }
   493         return s;
   494     }
   495 
   496     /**
   497      * Creates a new phaser with no initially registered parties, no
   498      * parent, and initial phase number 0. Any thread using this
   499      * phaser will need to first register for it.
   500      */
   501     public Phaser() {
   502         this(null, 0);
   503     }
   504 
   505     /**
   506      * Creates a new phaser with the given number of registered
   507      * unarrived parties, no parent, and initial phase number 0.
   508      *
   509      * @param parties the number of parties required to advance to the
   510      * next phase
   511      * @throws IllegalArgumentException if parties less than zero
   512      * or greater than the maximum number of parties supported
   513      */
   514     public Phaser(int parties) {
   515         this(null, parties);
   516     }
   517 
   518     /**
   519      * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
   520      *
   521      * @param parent the parent phaser
   522      */
   523     public Phaser(Phaser parent) {
   524         this(parent, 0);
   525     }
   526 
   527     /**
   528      * Creates a new phaser with the given parent and number of
   529      * registered unarrived parties.  When the given parent is non-null
   530      * and the given number of parties is greater than zero, this
   531      * child phaser is registered with its parent.
   532      *
   533      * @param parent the parent phaser
   534      * @param parties the number of parties required to advance to the
   535      * next phase
   536      * @throws IllegalArgumentException if parties less than zero
   537      * or greater than the maximum number of parties supported
   538      */
   539     public Phaser(Phaser parent, int parties) {
   540         if (parties >>> PARTIES_SHIFT != 0)
   541             throw new IllegalArgumentException("Illegal number of parties");
   542         int phase = 0;
   543         this.parent = parent;
   544         if (parent != null) {
   545             final Phaser root = parent.root;
   546             this.root = root;
   547             this.evenQ = root.evenQ;
   548             this.oddQ = root.oddQ;
   549             if (parties != 0)
   550                 phase = parent.doRegister(1);
   551         }
   552         else {
   553             this.root = this;
   554             this.evenQ = new AtomicReference<QNode>();
   555             this.oddQ = new AtomicReference<QNode>();
   556         }
   557         this.state = (parties == 0) ? (long)EMPTY :
   558             ((long)phase << PHASE_SHIFT) |
   559             ((long)parties << PARTIES_SHIFT) |
   560             ((long)parties);
   561     }
   562 
   563     /**
   564      * Adds a new unarrived party to this phaser.  If an ongoing
   565      * invocation of {@link #onAdvance} is in progress, this method
   566      * may await its completion before returning.  If this phaser has
   567      * a parent, and this phaser previously had no registered parties,
   568      * this child phaser is also registered with its parent. If
   569      * this phaser is terminated, the attempt to register has
   570      * no effect, and a negative value is returned.
   571      *
   572      * @return the arrival phase number to which this registration
   573      * applied.  If this value is negative, then this phaser has
   574      * terminated, in which case registration has no effect.
   575      * @throws IllegalStateException if attempting to register more
   576      * than the maximum supported number of parties
   577      */
   578     public int register() {
   579         return doRegister(1);
   580     }
   581 
   582     /**
   583      * Adds the given number of new unarrived parties to this phaser.
   584      * If an ongoing invocation of {@link #onAdvance} is in progress,
   585      * this method may await its completion before returning.  If this
   586      * phaser has a parent, and the given number of parties is greater
   587      * than zero, and this phaser previously had no registered
   588      * parties, this child phaser is also registered with its parent.
   589      * If this phaser is terminated, the attempt to register has no
   590      * effect, and a negative value is returned.
   591      *
   592      * @param parties the number of additional parties required to
   593      * advance to the next phase
   594      * @return the arrival phase number to which this registration
   595      * applied.  If this value is negative, then this phaser has
   596      * terminated, in which case registration has no effect.
   597      * @throws IllegalStateException if attempting to register more
   598      * than the maximum supported number of parties
   599      * @throws IllegalArgumentException if {@code parties < 0}
   600      */
   601     public int bulkRegister(int parties) {
   602         if (parties < 0)
   603             throw new IllegalArgumentException();
   604         if (parties == 0)
   605             return getPhase();
   606         return doRegister(parties);
   607     }
   608 
   609     /**
   610      * Arrives at this phaser, without waiting for others to arrive.
   611      *
   612      * <p>It is a usage error for an unregistered party to invoke this
   613      * method.  However, this error may result in an {@code
   614      * IllegalStateException} only upon some subsequent operation on
   615      * this phaser, if ever.
   616      *
   617      * @return the arrival phase number, or a negative value if terminated
   618      * @throws IllegalStateException if not terminated and the number
   619      * of unarrived parties would become negative
   620      */
   621     public int arrive() {
   622         return doArrive(false);
   623     }
   624 
   625     /**
   626      * Arrives at this phaser and deregisters from it without waiting
   627      * for others to arrive. Deregistration reduces the number of
   628      * parties required to advance in future phases.  If this phaser
   629      * has a parent, and deregistration causes this phaser to have
   630      * zero parties, this phaser is also deregistered from its parent.
   631      *
   632      * <p>It is a usage error for an unregistered party to invoke this
   633      * method.  However, this error may result in an {@code
   634      * IllegalStateException} only upon some subsequent operation on
   635      * this phaser, if ever.
   636      *
   637      * @return the arrival phase number, or a negative value if terminated
   638      * @throws IllegalStateException if not terminated and the number
   639      * of registered or unarrived parties would become negative
   640      */
   641     public int arriveAndDeregister() {
   642         return doArrive(true);
   643     }
   644 
   645     /**
   646      * Arrives at this phaser and awaits others. Equivalent in effect
   647      * to {@code awaitAdvance(arrive())}.  If you need to await with
   648      * interruption or timeout, you can arrange this with an analogous
   649      * construction using one of the other forms of the {@code
   650      * awaitAdvance} method.  If instead you need to deregister upon
   651      * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
   652      *
   653      * <p>It is a usage error for an unregistered party to invoke this
   654      * method.  However, this error may result in an {@code
   655      * IllegalStateException} only upon some subsequent operation on
   656      * this phaser, if ever.
   657      *
   658      * @return the arrival phase number, or the (negative)
   659      * {@linkplain #getPhase() current phase} if terminated
   660      * @throws IllegalStateException if not terminated and the number
   661      * of unarrived parties would become negative
   662      */
   663     public int arriveAndAwaitAdvance() {
   664         // Specialization of doArrive+awaitAdvance eliminating some reads/paths
   665         final Phaser root = this.root;
   666         for (;;) {
   667             long s = (root == this) ? state : reconcileState();
   668             int phase = (int)(s >>> PHASE_SHIFT);
   669             int counts = (int)s;
   670             int unarrived = (counts & UNARRIVED_MASK) - 1;
   671             if (phase < 0)
   672                 return phase;
   673             else if (counts == EMPTY || unarrived < 0) {
   674                 if (reconcileState() == s)
   675                     throw new IllegalStateException(badArrive(s));
   676             }
   677             else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
   678                                                s -= ONE_ARRIVAL)) {
   679                 if (unarrived != 0)
   680                     return root.internalAwaitAdvance(phase, null);
   681                 if (root != this)
   682                     return parent.arriveAndAwaitAdvance();
   683                 long n = s & PARTIES_MASK;  // base of next state
   684                 int nextUnarrived = (int)n >>> PARTIES_SHIFT;
   685                 if (onAdvance(phase, nextUnarrived))
   686                     n |= TERMINATION_BIT;
   687                 else if (nextUnarrived == 0)
   688                     n |= EMPTY;
   689                 else
   690                     n |= nextUnarrived;
   691                 int nextPhase = (phase + 1) & MAX_PHASE;
   692                 n |= (long)nextPhase << PHASE_SHIFT;
   693                 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
   694                     return (int)(state >>> PHASE_SHIFT); // terminated
   695                 releaseWaiters(phase);
   696                 return nextPhase;
   697             }
   698         }
   699     }
   700 
   701     /**
   702      * Awaits the phase of this phaser to advance from the given phase
   703      * value, returning immediately if the current phase is not equal
   704      * to the given phase value or this phaser is terminated.
   705      *
   706      * @param phase an arrival phase number, or negative value if
   707      * terminated; this argument is normally the value returned by a
   708      * previous call to {@code arrive} or {@code arriveAndDeregister}.
   709      * @return the next arrival phase number, or the argument if it is
   710      * negative, or the (negative) {@linkplain #getPhase() current phase}
   711      * if terminated
   712      */
   713     public int awaitAdvance(int phase) {
   714         final Phaser root = this.root;
   715         long s = (root == this) ? state : reconcileState();
   716         int p = (int)(s >>> PHASE_SHIFT);
   717         if (phase < 0)
   718             return phase;
   719         if (p == phase)
   720             return root.internalAwaitAdvance(phase, null);
   721         return p;
   722     }
   723 
   724     /**
   725      * Awaits the phase of this phaser to advance from the given phase
   726      * value, throwing {@code InterruptedException} if interrupted
   727      * while waiting, or returning immediately if the current phase is
   728      * not equal to the given phase value or this phaser is
   729      * terminated.
   730      *
   731      * @param phase an arrival phase number, or negative value if
   732      * terminated; this argument is normally the value returned by a
   733      * previous call to {@code arrive} or {@code arriveAndDeregister}.
   734      * @return the next arrival phase number, or the argument if it is
   735      * negative, or the (negative) {@linkplain #getPhase() current phase}
   736      * if terminated
   737      * @throws InterruptedException if thread interrupted while waiting
   738      */
   739     public int awaitAdvanceInterruptibly(int phase)
   740         throws InterruptedException {
   741         final Phaser root = this.root;
   742         long s = (root == this) ? state : reconcileState();
   743         int p = (int)(s >>> PHASE_SHIFT);
   744         if (phase < 0)
   745             return phase;
   746         if (p == phase) {
   747             QNode node = new QNode(this, phase, true, false, 0L);
   748             p = root.internalAwaitAdvance(phase, node);
   749             if (node.wasInterrupted)
   750                 throw new InterruptedException();
   751         }
   752         return p;
   753     }
   754 
   755     /**
   756      * Awaits the phase of this phaser to advance from the given phase
   757      * value or the given timeout to elapse, throwing {@code
   758      * InterruptedException} if interrupted while waiting, or
   759      * returning immediately if the current phase is not equal to the
   760      * given phase value or this phaser is terminated.
   761      *
   762      * @param phase an arrival phase number, or negative value if
   763      * terminated; this argument is normally the value returned by a
   764      * previous call to {@code arrive} or {@code arriveAndDeregister}.
   765      * @param timeout how long to wait before giving up, in units of
   766      *        {@code unit}
   767      * @param unit a {@code TimeUnit} determining how to interpret the
   768      *        {@code timeout} parameter
   769      * @return the next arrival phase number, or the argument if it is
   770      * negative, or the (negative) {@linkplain #getPhase() current phase}
   771      * if terminated
   772      * @throws InterruptedException if thread interrupted while waiting
   773      * @throws TimeoutException if timed out while waiting
   774      */
   775     public int awaitAdvanceInterruptibly(int phase,
   776                                          long timeout, TimeUnit unit)
   777         throws InterruptedException, TimeoutException {
   778         long nanos = unit.toNanos(timeout);
   779         final Phaser root = this.root;
   780         long s = (root == this) ? state : reconcileState();
   781         int p = (int)(s >>> PHASE_SHIFT);
   782         if (phase < 0)
   783             return phase;
   784         if (p == phase) {
   785             QNode node = new QNode(this, phase, true, true, nanos);
   786             p = root.internalAwaitAdvance(phase, node);
   787             if (node.wasInterrupted)
   788                 throw new InterruptedException();
   789             else if (p == phase)
   790                 throw new TimeoutException();
   791         }
   792         return p;
   793     }
   794 
   795     /**
   796      * Forces this phaser to enter termination state.  Counts of
   797      * registered parties are unaffected.  If this phaser is a member
   798      * of a tiered set of phasers, then all of the phasers in the set
   799      * are terminated.  If this phaser is already terminated, this
   800      * method has no effect.  This method may be useful for
   801      * coordinating recovery after one or more tasks encounter
   802      * unexpected exceptions.
   803      */
   804     public void forceTermination() {
   805         // Only need to change root state
   806         final Phaser root = this.root;
   807         long s;
   808         while ((s = root.state) >= 0) {
   809             if (UNSAFE.compareAndSwapLong(root, stateOffset,
   810                                           s, s | TERMINATION_BIT)) {
   811                 // signal all threads
   812                 releaseWaiters(0);
   813                 releaseWaiters(1);
   814                 return;
   815             }
   816         }
   817     }
   818 
   819     /**
   820      * Returns the current phase number. The maximum phase number is
   821      * {@code Integer.MAX_VALUE}, after which it restarts at
   822      * zero. Upon termination, the phase number is negative,
   823      * in which case the prevailing phase prior to termination
   824      * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
   825      *
   826      * @return the phase number, or a negative value if terminated
   827      */
   828     public final int getPhase() {
   829         return (int)(root.state >>> PHASE_SHIFT);
   830     }
   831 
   832     /**
   833      * Returns the number of parties registered at this phaser.
   834      *
   835      * @return the number of parties
   836      */
   837     public int getRegisteredParties() {
   838         return partiesOf(state);
   839     }
   840 
   841     /**
   842      * Returns the number of registered parties that have arrived at
   843      * the current phase of this phaser. If this phaser has terminated,
   844      * the returned value is meaningless and arbitrary.
   845      *
   846      * @return the number of arrived parties
   847      */
   848     public int getArrivedParties() {
   849         return arrivedOf(reconcileState());
   850     }
   851 
   852     /**
   853      * Returns the number of registered parties that have not yet
   854      * arrived at the current phase of this phaser. If this phaser has
   855      * terminated, the returned value is meaningless and arbitrary.
   856      *
   857      * @return the number of unarrived parties
   858      */
   859     public int getUnarrivedParties() {
   860         return unarrivedOf(reconcileState());
   861     }
   862 
   863     /**
   864      * Returns the parent of this phaser, or {@code null} if none.
   865      *
   866      * @return the parent of this phaser, or {@code null} if none
   867      */
   868     public Phaser getParent() {
   869         return parent;
   870     }
   871 
   872     /**
   873      * Returns the root ancestor of this phaser, which is the same as
   874      * this phaser if it has no parent.
   875      *
   876      * @return the root ancestor of this phaser
   877      */
   878     public Phaser getRoot() {
   879         return root;
   880     }
   881 
   882     /**
   883      * Returns {@code true} if this phaser has been terminated.
   884      *
   885      * @return {@code true} if this phaser has been terminated
   886      */
   887     public boolean isTerminated() {
   888         return root.state < 0L;
   889     }
   890 
   891     /**
   892      * Overridable method to perform an action upon impending phase
   893      * advance, and to control termination. This method is invoked
   894      * upon arrival of the party advancing this phaser (when all other
   895      * waiting parties are dormant).  If this method returns {@code
   896      * true}, this phaser will be set to a final termination state
   897      * upon advance, and subsequent calls to {@link #isTerminated}
   898      * will return true. Any (unchecked) Exception or Error thrown by
   899      * an invocation of this method is propagated to the party
   900      * attempting to advance this phaser, in which case no advance
   901      * occurs.
   902      *
   903      * <p>The arguments to this method provide the state of the phaser
   904      * prevailing for the current transition.  The effects of invoking
   905      * arrival, registration, and waiting methods on this phaser from
   906      * within {@code onAdvance} are unspecified and should not be
   907      * relied on.
   908      *
   909      * <p>If this phaser is a member of a tiered set of phasers, then
   910      * {@code onAdvance} is invoked only for its root phaser on each
   911      * advance.
   912      *
   913      * <p>To support the most common use cases, the default
   914      * implementation of this method returns {@code true} when the
   915      * number of registered parties has become zero as the result of a
   916      * party invoking {@code arriveAndDeregister}.  You can disable
   917      * this behavior, thus enabling continuation upon future
   918      * registrations, by overriding this method to always return
   919      * {@code false}:
   920      *
   921      * <pre> {@code
   922      * Phaser phaser = new Phaser() {
   923      *   protected boolean onAdvance(int phase, int parties) { return false; }
   924      * }}</pre>
   925      *
   926      * @param phase the current phase number on entry to this method,
   927      * before this phaser is advanced
   928      * @param registeredParties the current number of registered parties
   929      * @return {@code true} if this phaser should terminate
   930      */
   931     protected boolean onAdvance(int phase, int registeredParties) {
   932         return registeredParties == 0;
   933     }
   934 
   935     /**
   936      * Returns a string identifying this phaser, as well as its
   937      * state.  The state, in brackets, includes the String {@code
   938      * "phase = "} followed by the phase number, {@code "parties = "}
   939      * followed by the number of registered parties, and {@code
   940      * "arrived = "} followed by the number of arrived parties.
   941      *
   942      * @return a string identifying this phaser, as well as its state
   943      */
   944     public String toString() {
   945         return stateToString(reconcileState());
   946     }
   947 
   948     /**
   949      * Implementation of toString and string-based error messages
   950      */
   951     private String stateToString(long s) {
   952         return super.toString() +
   953             "[phase = " + phaseOf(s) +
   954             " parties = " + partiesOf(s) +
   955             " arrived = " + arrivedOf(s) + "]";
   956     }
   957 
   958     // Waiting mechanics
   959 
   960     /**
   961      * Removes and signals threads from queue for phase.
   962      */
   963     private void releaseWaiters(int phase) {
   964         QNode q;   // first element of queue
   965         Thread t;  // its thread
   966         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
   967         while ((q = head.get()) != null &&
   968                q.phase != (int)(root.state >>> PHASE_SHIFT)) {
   969             if (head.compareAndSet(q, q.next) &&
   970                 (t = q.thread) != null) {
   971                 q.thread = null;
   972                 LockSupport.unpark(t);
   973             }
   974         }
   975     }
   976 
   977     /**
   978      * Variant of releaseWaiters that additionally tries to remove any
   979      * nodes no longer waiting for advance due to timeout or
   980      * interrupt. Currently, nodes are removed only if they are at
   981      * head of queue, which suffices to reduce memory footprint in
   982      * most usages.
   983      *
   984      * @return current phase on exit
   985      */
   986     private int abortWait(int phase) {
   987         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
   988         for (;;) {
   989             Thread t;
   990             QNode q = head.get();
   991             int p = (int)(root.state >>> PHASE_SHIFT);
   992             if (q == null || ((t = q.thread) != null && q.phase == p))
   993                 return p;
   994             if (head.compareAndSet(q, q.next) && t != null) {
   995                 q.thread = null;
   996                 LockSupport.unpark(t);
   997             }
   998         }
   999     }
  1000 
  1001     /** The number of CPUs, for spin control */
  1002     private static final int NCPU = Runtime.getRuntime().availableProcessors();
  1003 
  1004     /**
  1005      * The number of times to spin before blocking while waiting for
  1006      * advance, per arrival while waiting. On multiprocessors, fully
  1007      * blocking and waking up a large number of threads all at once is
  1008      * usually a very slow process, so we use rechargeable spins to
  1009      * avoid it when threads regularly arrive: When a thread in
  1010      * internalAwaitAdvance notices another arrival before blocking,
  1011      * and there appear to be enough CPUs available, it spins
  1012      * SPINS_PER_ARRIVAL more times before blocking. The value trades
  1013      * off good-citizenship vs big unnecessary slowdowns.
  1014      */
  1015     static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
  1016 
  1017     /**
  1018      * Possibly blocks and waits for phase to advance unless aborted.
  1019      * Call only from root node.
  1020      *
  1021      * @param phase current phase
  1022      * @param node if non-null, the wait node to track interrupt and timeout;
  1023      * if null, denotes noninterruptible wait
  1024      * @return current phase
  1025      */
  1026     private int internalAwaitAdvance(int phase, QNode node) {
  1027         releaseWaiters(phase-1);          // ensure old queue clean
  1028         boolean queued = false;           // true when node is enqueued
  1029         int lastUnarrived = 0;            // to increase spins upon change
  1030         int spins = SPINS_PER_ARRIVAL;
  1031         long s;
  1032         int p;
  1033         while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
  1034             if (node == null) {           // spinning in noninterruptible mode
  1035                 int unarrived = (int)s & UNARRIVED_MASK;
  1036                 if (unarrived != lastUnarrived &&
  1037                     (lastUnarrived = unarrived) < NCPU)
  1038                     spins += SPINS_PER_ARRIVAL;
  1039                 boolean interrupted = Thread.interrupted();
  1040                 if (interrupted || --spins < 0) { // need node to record intr
  1041                     node = new QNode(this, phase, false, false, 0L);
  1042                     node.wasInterrupted = interrupted;
  1043                 }
  1044             }
  1045             else if (node.isReleasable()) // done or aborted
  1046                 break;
  1047             else if (!queued) {           // push onto queue
  1048                 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
  1049                 QNode q = node.next = head.get();
  1050                 if ((q == null || q.phase == phase) &&
  1051                     (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
  1052                     queued = head.compareAndSet(q, node);
  1053             }
  1054             else {
  1055                 try {
  1056                     ForkJoinPool.managedBlock(node);
  1057                 } catch (InterruptedException ie) {
  1058                     node.wasInterrupted = true;
  1059                 }
  1060             }
  1061         }
  1062 
  1063         if (node != null) {
  1064             if (node.thread != null)
  1065                 node.thread = null;       // avoid need for unpark()
  1066             if (node.wasInterrupted && !node.interruptible)
  1067                 Thread.currentThread().interrupt();
  1068             if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
  1069                 return abortWait(phase); // possibly clean up on abort
  1070         }
  1071         releaseWaiters(phase);
  1072         return p;
  1073     }
  1074 
  1075     /**
  1076      * Wait nodes for Treiber stack representing wait queue
  1077      */
  1078     static final class QNode implements ForkJoinPool.ManagedBlocker {
  1079         final Phaser phaser;
  1080         final int phase;
  1081         final boolean interruptible;
  1082         final boolean timed;
  1083         boolean wasInterrupted;
  1084         long nanos;
  1085         long lastTime;
  1086         volatile Thread thread; // nulled to cancel wait
  1087         QNode next;
  1088 
  1089         QNode(Phaser phaser, int phase, boolean interruptible,
  1090               boolean timed, long nanos) {
  1091             this.phaser = phaser;
  1092             this.phase = phase;
  1093             this.interruptible = interruptible;
  1094             this.nanos = nanos;
  1095             this.timed = timed;
  1096             this.lastTime = timed ? System.nanoTime() : 0L;
  1097             thread = Thread.currentThread();
  1098         }
  1099 
  1100         public boolean isReleasable() {
  1101             if (thread == null)
  1102                 return true;
  1103             if (phaser.getPhase() != phase) {
  1104                 thread = null;
  1105                 return true;
  1106             }
  1107             if (Thread.interrupted())
  1108                 wasInterrupted = true;
  1109             if (wasInterrupted && interruptible) {
  1110                 thread = null;
  1111                 return true;
  1112             }
  1113             if (timed) {
  1114                 if (nanos > 0L) {
  1115                     long now = System.nanoTime();
  1116                     nanos -= now - lastTime;
  1117                     lastTime = now;
  1118                 }
  1119                 if (nanos <= 0L) {
  1120                     thread = null;
  1121                     return true;
  1122                 }
  1123             }
  1124             return false;
  1125         }
  1126 
  1127         public boolean block() {
  1128             if (isReleasable())
  1129                 return true;
  1130             else if (!timed)
  1131                 LockSupport.park(this);
  1132             else if (nanos > 0)
  1133                 LockSupport.parkNanos(this, nanos);
  1134             return isReleasable();
  1135         }
  1136     }
  1137 
  1138     // Unsafe mechanics
  1139 
  1140     private static final sun.misc.Unsafe UNSAFE;
  1141     private static final long stateOffset;
  1142     static {
  1143         try {
  1144             UNSAFE = sun.misc.Unsafe.getUnsafe();
  1145             Class k = Phaser.class;
  1146             stateOffset = UNSAFE.objectFieldOffset
  1147                 (k.getDeclaredField("state"));
  1148         } catch (Exception e) {
  1149             throw new Error(e);
  1150         }
  1151     }
  1152 }