rt/emul/compact/src/main/java/java/util/concurrent/Phaser.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 12:51:03 +0100
changeset 1895 bfaf3300b7ba
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent package compilable except ForkJoinPool
jaroslav@1890
     1
/*
jaroslav@1890
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
jaroslav@1890
     3
 *
jaroslav@1890
     4
 * This code is free software; you can redistribute it and/or modify it
jaroslav@1890
     5
 * under the terms of the GNU General Public License version 2 only, as
jaroslav@1890
     6
 * published by the Free Software Foundation.  Oracle designates this
jaroslav@1890
     7
 * particular file as subject to the "Classpath" exception as provided
jaroslav@1890
     8
 * by Oracle in the LICENSE file that accompanied this code.
jaroslav@1890
     9
 *
jaroslav@1890
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
jaroslav@1890
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
jaroslav@1890
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
jaroslav@1890
    13
 * version 2 for more details (a copy is included in the LICENSE file that
jaroslav@1890
    14
 * accompanied this code).
jaroslav@1890
    15
 *
jaroslav@1890
    16
 * You should have received a copy of the GNU General Public License version
jaroslav@1890
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
jaroslav@1890
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
jaroslav@1890
    19
 *
jaroslav@1890
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
jaroslav@1890
    21
 * or visit www.oracle.com if you need additional information or have any
jaroslav@1890
    22
 * questions.
jaroslav@1890
    23
 */
jaroslav@1890
    24
jaroslav@1890
    25
/*
jaroslav@1890
    26
 * This file is available under and governed by the GNU General Public
jaroslav@1890
    27
 * License version 2 only, as published by the Free Software Foundation.
jaroslav@1890
    28
 * However, the following notice accompanied the original version of this
jaroslav@1890
    29
 * file:
jaroslav@1890
    30
 *
jaroslav@1890
    31
 * Written by Doug Lea with assistance from members of JCP JSR-166
jaroslav@1890
    32
 * Expert Group and released to the public domain, as explained at
jaroslav@1890
    33
 * http://creativecommons.org/publicdomain/zero/1.0/
jaroslav@1890
    34
 */
jaroslav@1890
    35
jaroslav@1890
    36
package java.util.concurrent;
jaroslav@1890
    37
jaroslav@1890
    38
import java.util.concurrent.TimeUnit;
jaroslav@1890
    39
import java.util.concurrent.TimeoutException;
jaroslav@1890
    40
import java.util.concurrent.atomic.AtomicReference;
jaroslav@1890
    41
import java.util.concurrent.locks.LockSupport;
jaroslav@1890
    42
jaroslav@1890
    43
/**
jaroslav@1890
    44
 * A reusable synchronization barrier, similar in functionality to
jaroslav@1890
    45
 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
jaroslav@1890
    46
 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
jaroslav@1890
    47
 * but supporting more flexible usage.
jaroslav@1890
    48
 *
jaroslav@1890
    49
 * <p> <b>Registration.</b> Unlike the case for other barriers, the
jaroslav@1890
    50
 * number of parties <em>registered</em> to synchronize on a phaser
jaroslav@1890
    51
 * may vary over time.  Tasks may be registered at any time (using
jaroslav@1890
    52
 * methods {@link #register}, {@link #bulkRegister}, or forms of
jaroslav@1890
    53
 * constructors establishing initial numbers of parties), and
jaroslav@1890
    54
 * optionally deregistered upon any arrival (using {@link
jaroslav@1890
    55
 * #arriveAndDeregister}).  As is the case with most basic
jaroslav@1890
    56
 * synchronization constructs, registration and deregistration affect
jaroslav@1890
    57
 * only internal counts; they do not establish any further internal
jaroslav@1890
    58
 * bookkeeping, so tasks cannot query whether they are registered.
jaroslav@1890
    59
 * (However, you can introduce such bookkeeping by subclassing this
jaroslav@1890
    60
 * class.)
jaroslav@1890
    61
 *
jaroslav@1890
    62
 * <p> <b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
jaroslav@1890
    63
 * Phaser} may be repeatedly awaited.  Method {@link
jaroslav@1890
    64
 * #arriveAndAwaitAdvance} has effect analogous to {@link
jaroslav@1890
    65
 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
jaroslav@1890
    66
 * generation of a phaser has an associated phase number. The phase
jaroslav@1890
    67
 * number starts at zero, and advances when all parties arrive at the
jaroslav@1890
    68
 * phaser, wrapping around to zero after reaching {@code
jaroslav@1890
    69
 * Integer.MAX_VALUE}. The use of phase numbers enables independent
jaroslav@1890
    70
 * control of actions upon arrival at a phaser and upon awaiting
jaroslav@1890
    71
 * others, via two kinds of methods that may be invoked by any
jaroslav@1890
    72
 * registered party:
jaroslav@1890
    73
 *
jaroslav@1890
    74
 * <ul>
jaroslav@1890
    75
 *
jaroslav@1890
    76
 *   <li> <b>Arrival.</b> Methods {@link #arrive} and
jaroslav@1890
    77
 *       {@link #arriveAndDeregister} record arrival.  These methods
jaroslav@1890
    78
 *       do not block, but return an associated <em>arrival phase
jaroslav@1890
    79
 *       number</em>; that is, the phase number of the phaser to which
jaroslav@1890
    80
 *       the arrival applied. When the final party for a given phase
jaroslav@1890
    81
 *       arrives, an optional action is performed and the phase
jaroslav@1890
    82
 *       advances.  These actions are performed by the party
jaroslav@1890
    83
 *       triggering a phase advance, and are arranged by overriding
jaroslav@1890
    84
 *       method {@link #onAdvance(int, int)}, which also controls
jaroslav@1890
    85
 *       termination. Overriding this method is similar to, but more
jaroslav@1890
    86
 *       flexible than, providing a barrier action to a {@code
jaroslav@1890
    87
 *       CyclicBarrier}.
jaroslav@1890
    88
 *
jaroslav@1890
    89
 *   <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an
jaroslav@1890
    90
 *       argument indicating an arrival phase number, and returns when
jaroslav@1890
    91
 *       the phaser advances to (or is already at) a different phase.
jaroslav@1890
    92
 *       Unlike similar constructions using {@code CyclicBarrier},
jaroslav@1890
    93
 *       method {@code awaitAdvance} continues to wait even if the
jaroslav@1890
    94
 *       waiting thread is interrupted. Interruptible and timeout
jaroslav@1890
    95
 *       versions are also available, but exceptions encountered while
jaroslav@1890
    96
 *       tasks wait interruptibly or with timeout do not change the
jaroslav@1890
    97
 *       state of the phaser. If necessary, you can perform any
jaroslav@1890
    98
 *       associated recovery within handlers of those exceptions,
jaroslav@1890
    99
 *       often after invoking {@code forceTermination}.  Phasers may
jaroslav@1890
   100
 *       also be used by tasks executing in a {@link ForkJoinPool},
jaroslav@1890
   101
 *       which will ensure sufficient parallelism to execute tasks
jaroslav@1890
   102
 *       when others are blocked waiting for a phase to advance.
jaroslav@1890
   103
 *
jaroslav@1890
   104
 * </ul>
jaroslav@1890
   105
 *
jaroslav@1890
   106
 * <p> <b>Termination.</b> A phaser may enter a <em>termination</em>
jaroslav@1890
   107
 * state, that may be checked using method {@link #isTerminated}. Upon
jaroslav@1890
   108
 * termination, all synchronization methods immediately return without
jaroslav@1890
   109
 * waiting for advance, as indicated by a negative return value.
jaroslav@1890
   110
 * Similarly, attempts to register upon termination have no effect.
jaroslav@1890
   111
 * Termination is triggered when an invocation of {@code onAdvance}
jaroslav@1890
   112
 * returns {@code true}. The default implementation returns {@code
jaroslav@1890
   113
 * true} if a deregistration has caused the number of registered
jaroslav@1890
   114
 * parties to become zero.  As illustrated below, when phasers control
jaroslav@1890
   115
 * actions with a fixed number of iterations, it is often convenient
jaroslav@1890
   116
 * to override this method to cause termination when the current phase
jaroslav@1890
   117
 * number reaches a threshold. Method {@link #forceTermination} is
jaroslav@1890
   118
 * also available to abruptly release waiting threads and allow them
jaroslav@1890
   119
 * to terminate.
jaroslav@1890
   120
 *
jaroslav@1890
   121
 * <p> <b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
jaroslav@1890
   122
 * constructed in tree structures) to reduce contention. Phasers with
jaroslav@1890
   123
 * large numbers of parties that would otherwise experience heavy
jaroslav@1890
   124
 * synchronization contention costs may instead be set up so that
jaroslav@1890
   125
 * groups of sub-phasers share a common parent.  This may greatly
jaroslav@1890
   126
 * increase throughput even though it incurs greater per-operation
jaroslav@1890
   127
 * overhead.
jaroslav@1890
   128
 *
jaroslav@1890
   129
 * <p>In a tree of tiered phasers, registration and deregistration of
jaroslav@1890
   130
 * child phasers with their parent are managed automatically.
jaroslav@1890
   131
 * Whenever the number of registered parties of a child phaser becomes
jaroslav@1890
   132
 * non-zero (as established in the {@link #Phaser(Phaser,int)}
jaroslav@1890
   133
 * constructor, {@link #register}, or {@link #bulkRegister}), the
jaroslav@1890
   134
 * child phaser is registered with its parent.  Whenever the number of
jaroslav@1890
   135
 * registered parties becomes zero as the result of an invocation of
jaroslav@1890
   136
 * {@link #arriveAndDeregister}, the child phaser is deregistered
jaroslav@1890
   137
 * from its parent.
jaroslav@1890
   138
 *
jaroslav@1890
   139
 * <p><b>Monitoring.</b> While synchronization methods may be invoked
jaroslav@1890
   140
 * only by registered parties, the current state of a phaser may be
jaroslav@1890
   141
 * monitored by any caller.  At any given moment there are {@link
jaroslav@1890
   142
 * #getRegisteredParties} parties in total, of which {@link
jaroslav@1890
   143
 * #getArrivedParties} have arrived at the current phase ({@link
jaroslav@1890
   144
 * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
jaroslav@1890
   145
 * parties arrive, the phase advances.  The values returned by these
jaroslav@1890
   146
 * methods may reflect transient states and so are not in general
jaroslav@1890
   147
 * useful for synchronization control.  Method {@link #toString}
jaroslav@1890
   148
 * returns snapshots of these state queries in a form convenient for
jaroslav@1890
   149
 * informal monitoring.
jaroslav@1890
   150
 *
jaroslav@1890
   151
 * <p><b>Sample usages:</b>
jaroslav@1890
   152
 *
jaroslav@1890
   153
 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
jaroslav@1890
   154
 * to control a one-shot action serving a variable number of parties.
jaroslav@1890
   155
 * The typical idiom is for the method setting this up to first
jaroslav@1890
   156
 * register, then start the actions, then deregister, as in:
jaroslav@1890
   157
 *
jaroslav@1890
   158
 *  <pre> {@code
jaroslav@1890
   159
 * void runTasks(List<Runnable> tasks) {
jaroslav@1890
   160
 *   final Phaser phaser = new Phaser(1); // "1" to register self
jaroslav@1890
   161
 *   // create and start threads
jaroslav@1890
   162
 *   for (final Runnable task : tasks) {
jaroslav@1890
   163
 *     phaser.register();
jaroslav@1890
   164
 *     new Thread() {
jaroslav@1890
   165
 *       public void run() {
jaroslav@1890
   166
 *         phaser.arriveAndAwaitAdvance(); // await all creation
jaroslav@1890
   167
 *         task.run();
jaroslav@1890
   168
 *       }
jaroslav@1890
   169
 *     }.start();
jaroslav@1890
   170
 *   }
jaroslav@1890
   171
 *
jaroslav@1890
   172
 *   // allow threads to start and deregister self
jaroslav@1890
   173
 *   phaser.arriveAndDeregister();
jaroslav@1890
   174
 * }}</pre>
jaroslav@1890
   175
 *
jaroslav@1890
   176
 * <p>One way to cause a set of threads to repeatedly perform actions
jaroslav@1890
   177
 * for a given number of iterations is to override {@code onAdvance}:
jaroslav@1890
   178
 *
jaroslav@1890
   179
 *  <pre> {@code
jaroslav@1890
   180
 * void startTasks(List<Runnable> tasks, final int iterations) {
jaroslav@1890
   181
 *   final Phaser phaser = new Phaser() {
jaroslav@1890
   182
 *     protected boolean onAdvance(int phase, int registeredParties) {
jaroslav@1890
   183
 *       return phase >= iterations || registeredParties == 0;
jaroslav@1890
   184
 *     }
jaroslav@1890
   185
 *   };
jaroslav@1890
   186
 *   phaser.register();
jaroslav@1890
   187
 *   for (final Runnable task : tasks) {
jaroslav@1890
   188
 *     phaser.register();
jaroslav@1890
   189
 *     new Thread() {
jaroslav@1890
   190
 *       public void run() {
jaroslav@1890
   191
 *         do {
jaroslav@1890
   192
 *           task.run();
jaroslav@1890
   193
 *           phaser.arriveAndAwaitAdvance();
jaroslav@1890
   194
 *         } while (!phaser.isTerminated());
jaroslav@1890
   195
 *       }
jaroslav@1890
   196
 *     }.start();
jaroslav@1890
   197
 *   }
jaroslav@1890
   198
 *   phaser.arriveAndDeregister(); // deregister self, don't wait
jaroslav@1890
   199
 * }}</pre>
jaroslav@1890
   200
 *
jaroslav@1890
   201
 * If the main task must later await termination, it
jaroslav@1890
   202
 * may re-register and then execute a similar loop:
jaroslav@1890
   203
 *  <pre> {@code
jaroslav@1890
   204
 *   // ...
jaroslav@1890
   205
 *   phaser.register();
jaroslav@1890
   206
 *   while (!phaser.isTerminated())
jaroslav@1890
   207
 *     phaser.arriveAndAwaitAdvance();}</pre>
jaroslav@1890
   208
 *
jaroslav@1890
   209
 * <p>Related constructions may be used to await particular phase numbers
jaroslav@1890
   210
 * in contexts where you are sure that the phase will never wrap around
jaroslav@1890
   211
 * {@code Integer.MAX_VALUE}. For example:
jaroslav@1890
   212
 *
jaroslav@1890
   213
 *  <pre> {@code
jaroslav@1890
   214
 * void awaitPhase(Phaser phaser, int phase) {
jaroslav@1890
   215
 *   int p = phaser.register(); // assumes caller not already registered
jaroslav@1890
   216
 *   while (p < phase) {
jaroslav@1890
   217
 *     if (phaser.isTerminated())
jaroslav@1890
   218
 *       // ... deal with unexpected termination
jaroslav@1890
   219
 *     else
jaroslav@1890
   220
 *       p = phaser.arriveAndAwaitAdvance();
jaroslav@1890
   221
 *   }
jaroslav@1890
   222
 *   phaser.arriveAndDeregister();
jaroslav@1890
   223
 * }}</pre>
jaroslav@1890
   224
 *
jaroslav@1890
   225
 *
jaroslav@1890
   226
 * <p>To create a set of {@code n} tasks using a tree of phasers, you
jaroslav@1890
   227
 * could use code of the following form, assuming a Task class with a
jaroslav@1890
   228
 * constructor accepting a {@code Phaser} that it registers with upon
jaroslav@1890
   229
 * construction. After invocation of {@code build(new Task[n], 0, n,
jaroslav@1890
   230
 * new Phaser())}, these tasks could then be started, for example by
jaroslav@1890
   231
 * submitting to a pool:
jaroslav@1890
   232
 *
jaroslav@1890
   233
 *  <pre> {@code
jaroslav@1890
   234
 * void build(Task[] tasks, int lo, int hi, Phaser ph) {
jaroslav@1890
   235
 *   if (hi - lo > TASKS_PER_PHASER) {
jaroslav@1890
   236
 *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
jaroslav@1890
   237
 *       int j = Math.min(i + TASKS_PER_PHASER, hi);
jaroslav@1890
   238
 *       build(tasks, i, j, new Phaser(ph));
jaroslav@1890
   239
 *     }
jaroslav@1890
   240
 *   } else {
jaroslav@1890
   241
 *     for (int i = lo; i < hi; ++i)
jaroslav@1890
   242
 *       tasks[i] = new Task(ph);
jaroslav@1890
   243
 *       // assumes new Task(ph) performs ph.register()
jaroslav@1890
   244
 *   }
jaroslav@1890
   245
 * }}</pre>
jaroslav@1890
   246
 *
jaroslav@1890
   247
 * The best value of {@code TASKS_PER_PHASER} depends mainly on
jaroslav@1890
   248
 * expected synchronization rates. A value as low as four may
jaroslav@1890
   249
 * be appropriate for extremely small per-phase task bodies (thus
jaroslav@1890
   250
 * high rates), or up to hundreds for extremely large ones.
jaroslav@1890
   251
 *
jaroslav@1890
   252
 * <p><b>Implementation notes</b>: This implementation restricts the
jaroslav@1890
   253
 * maximum number of parties to 65535. Attempts to register additional
jaroslav@1890
   254
 * parties result in {@code IllegalStateException}. However, you can and
jaroslav@1890
   255
 * should create tiered phasers to accommodate arbitrarily large sets
jaroslav@1890
   256
 * of participants.
jaroslav@1890
   257
 *
jaroslav@1890
   258
 * @since 1.7
jaroslav@1890
   259
 * @author Doug Lea
jaroslav@1890
   260
 */
jaroslav@1890
   261
public class Phaser {
jaroslav@1890
   262
    /*
jaroslav@1890
   263
     * This class implements an extension of X10 "clocks".  Thanks to
jaroslav@1890
   264
     * Vijay Saraswat for the idea, and to Vivek Sarkar for
jaroslav@1890
   265
     * enhancements to extend functionality.
jaroslav@1890
   266
     */
jaroslav@1890
   267
jaroslav@1890
   268
    /**
jaroslav@1890
   269
     * Primary state representation, holding four bit-fields:
jaroslav@1890
   270
     *
jaroslav@1890
   271
     * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
jaroslav@1890
   272
     * parties    -- the number of parties to wait            (bits 16-31)
jaroslav@1890
   273
     * phase      -- the generation of the barrier            (bits 32-62)
jaroslav@1890
   274
     * terminated -- set if barrier is terminated             (bit  63 / sign)
jaroslav@1890
   275
     *
jaroslav@1890
   276
     * Except that a phaser with no registered parties is
jaroslav@1890
   277
     * distinguished by the otherwise illegal state of having zero
jaroslav@1890
   278
     * parties and one unarrived parties (encoded as EMPTY below).
jaroslav@1890
   279
     *
jaroslav@1890
   280
     * To efficiently maintain atomicity, these values are packed into
jaroslav@1890
   281
     * a single (atomic) long. Good performance relies on keeping
jaroslav@1890
   282
     * state decoding and encoding simple, and keeping race windows
jaroslav@1890
   283
     * short.
jaroslav@1890
   284
     *
jaroslav@1890
   285
     * All state updates are performed via CAS except initial
jaroslav@1890
   286
     * registration of a sub-phaser (i.e., one with a non-null
jaroslav@1890
   287
     * parent).  In this (relatively rare) case, we use built-in
jaroslav@1890
   288
     * synchronization to lock while first registering with its
jaroslav@1890
   289
     * parent.
jaroslav@1890
   290
     *
jaroslav@1890
   291
     * The phase of a subphaser is allowed to lag that of its
jaroslav@1890
   292
     * ancestors until it is actually accessed -- see method
jaroslav@1890
   293
     * reconcileState.
jaroslav@1890
   294
     */
jaroslav@1890
   295
    private volatile long state;
jaroslav@1890
   296
jaroslav@1890
   297
    private static final int  MAX_PARTIES     = 0xffff;
jaroslav@1890
   298
    private static final int  MAX_PHASE       = Integer.MAX_VALUE;
jaroslav@1890
   299
    private static final int  PARTIES_SHIFT   = 16;
jaroslav@1890
   300
    private static final int  PHASE_SHIFT     = 32;
jaroslav@1890
   301
    private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
jaroslav@1890
   302
    private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
jaroslav@1890
   303
    private static final long TERMINATION_BIT = 1L << 63;
jaroslav@1890
   304
jaroslav@1890
   305
    // some special values
jaroslav@1890
   306
    private static final int  ONE_ARRIVAL     = 1;
jaroslav@1890
   307
    private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
jaroslav@1890
   308
    private static final int  EMPTY           = 1;
jaroslav@1890
   309
jaroslav@1890
   310
    // The following unpacking methods are usually manually inlined
jaroslav@1890
   311
jaroslav@1890
   312
    private static int unarrivedOf(long s) {
jaroslav@1890
   313
        int counts = (int)s;
jaroslav@1890
   314
        return (counts == EMPTY) ? 0 : counts & UNARRIVED_MASK;
jaroslav@1890
   315
    }
jaroslav@1890
   316
jaroslav@1890
   317
    private static int partiesOf(long s) {
jaroslav@1890
   318
        return (int)s >>> PARTIES_SHIFT;
jaroslav@1890
   319
    }
jaroslav@1890
   320
jaroslav@1890
   321
    private static int phaseOf(long s) {
jaroslav@1890
   322
        return (int)(s >>> PHASE_SHIFT);
jaroslav@1890
   323
    }
jaroslav@1890
   324
jaroslav@1890
   325
    private static int arrivedOf(long s) {
jaroslav@1890
   326
        int counts = (int)s;
jaroslav@1890
   327
        return (counts == EMPTY) ? 0 :
jaroslav@1890
   328
            (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
jaroslav@1890
   329
    }
jaroslav@1890
   330
jaroslav@1890
   331
    /**
jaroslav@1890
   332
     * The parent of this phaser, or null if none
jaroslav@1890
   333
     */
jaroslav@1890
   334
    private final Phaser parent;
jaroslav@1890
   335
jaroslav@1890
   336
    /**
jaroslav@1890
   337
     * The root of phaser tree. Equals this if not in a tree.
jaroslav@1890
   338
     */
jaroslav@1890
   339
    private final Phaser root;
jaroslav@1890
   340
jaroslav@1890
   341
    /**
jaroslav@1890
   342
     * Heads of Treiber stacks for waiting threads. To eliminate
jaroslav@1890
   343
     * contention when releasing some threads while adding others, we
jaroslav@1890
   344
     * use two of them, alternating across even and odd phases.
jaroslav@1890
   345
     * Subphasers share queues with root to speed up releases.
jaroslav@1890
   346
     */
jaroslav@1890
   347
    private final AtomicReference<QNode> evenQ;
jaroslav@1890
   348
    private final AtomicReference<QNode> oddQ;
jaroslav@1890
   349
jaroslav@1890
   350
    private AtomicReference<QNode> queueFor(int phase) {
jaroslav@1890
   351
        return ((phase & 1) == 0) ? evenQ : oddQ;
jaroslav@1890
   352
    }
jaroslav@1890
   353
jaroslav@1890
   354
    /**
jaroslav@1890
   355
     * Returns message string for bounds exceptions on arrival.
jaroslav@1890
   356
     */
jaroslav@1890
   357
    private String badArrive(long s) {
jaroslav@1890
   358
        return "Attempted arrival of unregistered party for " +
jaroslav@1890
   359
            stateToString(s);
jaroslav@1890
   360
    }
jaroslav@1890
   361
jaroslav@1890
   362
    /**
jaroslav@1890
   363
     * Returns message string for bounds exceptions on registration.
jaroslav@1890
   364
     */
jaroslav@1890
   365
    private String badRegister(long s) {
jaroslav@1890
   366
        return "Attempt to register more than " +
jaroslav@1890
   367
            MAX_PARTIES + " parties for " + stateToString(s);
jaroslav@1890
   368
    }
jaroslav@1890
   369
jaroslav@1890
   370
    /**
jaroslav@1890
   371
     * Main implementation for methods arrive and arriveAndDeregister.
jaroslav@1890
   372
     * Manually tuned to speed up and minimize race windows for the
jaroslav@1890
   373
     * common case of just decrementing unarrived field.
jaroslav@1890
   374
     *
jaroslav@1890
   375
     * @param deregister false for arrive, true for arriveAndDeregister
jaroslav@1890
   376
     */
jaroslav@1890
   377
    private int doArrive(boolean deregister) {
jaroslav@1890
   378
        int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
jaroslav@1890
   379
        final Phaser root = this.root;
jaroslav@1890
   380
        for (;;) {
jaroslav@1890
   381
            long s = (root == this) ? state : reconcileState();
jaroslav@1890
   382
            int phase = (int)(s >>> PHASE_SHIFT);
jaroslav@1890
   383
            int counts = (int)s;
jaroslav@1890
   384
            int unarrived = (counts & UNARRIVED_MASK) - 1;
jaroslav@1890
   385
            if (phase < 0)
jaroslav@1890
   386
                return phase;
jaroslav@1890
   387
            else if (counts == EMPTY || unarrived < 0) {
jaroslav@1890
   388
                if (root == this || reconcileState() == s)
jaroslav@1890
   389
                    throw new IllegalStateException(badArrive(s));
jaroslav@1890
   390
            }
jaroslav@1895
   391
            else if (compareAndSwapLong(s, s-=adj)) {
jaroslav@1890
   392
                if (unarrived == 0) {
jaroslav@1890
   393
                    long n = s & PARTIES_MASK;  // base of next state
jaroslav@1890
   394
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
jaroslav@1890
   395
                    if (root != this)
jaroslav@1890
   396
                        return parent.doArrive(nextUnarrived == 0);
jaroslav@1890
   397
                    if (onAdvance(phase, nextUnarrived))
jaroslav@1890
   398
                        n |= TERMINATION_BIT;
jaroslav@1890
   399
                    else if (nextUnarrived == 0)
jaroslav@1890
   400
                        n |= EMPTY;
jaroslav@1890
   401
                    else
jaroslav@1890
   402
                        n |= nextUnarrived;
jaroslav@1890
   403
                    n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;
jaroslav@1895
   404
                    compareAndSwapLong(s, n);
jaroslav@1890
   405
                    releaseWaiters(phase);
jaroslav@1890
   406
                }
jaroslav@1890
   407
                return phase;
jaroslav@1890
   408
            }
jaroslav@1890
   409
        }
jaroslav@1890
   410
    }
jaroslav@1890
   411
jaroslav@1890
   412
    /**
jaroslav@1890
   413
     * Implementation of register, bulkRegister
jaroslav@1890
   414
     *
jaroslav@1890
   415
     * @param registrations number to add to both parties and
jaroslav@1890
   416
     * unarrived fields. Must be greater than zero.
jaroslav@1890
   417
     */
jaroslav@1890
   418
    private int doRegister(int registrations) {
jaroslav@1890
   419
        // adjustment to state
jaroslav@1890
   420
        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
jaroslav@1890
   421
        final Phaser parent = this.parent;
jaroslav@1890
   422
        int phase;
jaroslav@1890
   423
        for (;;) {
jaroslav@1890
   424
            long s = state;
jaroslav@1890
   425
            int counts = (int)s;
jaroslav@1890
   426
            int parties = counts >>> PARTIES_SHIFT;
jaroslav@1890
   427
            int unarrived = counts & UNARRIVED_MASK;
jaroslav@1890
   428
            if (registrations > MAX_PARTIES - parties)
jaroslav@1890
   429
                throw new IllegalStateException(badRegister(s));
jaroslav@1890
   430
            else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
jaroslav@1890
   431
                break;
jaroslav@1890
   432
            else if (counts != EMPTY) {             // not 1st registration
jaroslav@1890
   433
                if (parent == null || reconcileState() == s) {
jaroslav@1890
   434
                    if (unarrived == 0)             // wait out advance
jaroslav@1890
   435
                        root.internalAwaitAdvance(phase, null);
jaroslav@1895
   436
                    else if (compareAndSwapLong(                                                       s, s + adj))
jaroslav@1890
   437
                        break;
jaroslav@1890
   438
                }
jaroslav@1890
   439
            }
jaroslav@1890
   440
            else if (parent == null) {              // 1st root registration
jaroslav@1890
   441
                long next = ((long)phase << PHASE_SHIFT) | adj;
jaroslav@1895
   442
                if (compareAndSwapLong(s, next))
jaroslav@1890
   443
                    break;
jaroslav@1890
   444
            }
jaroslav@1890
   445
            else {
jaroslav@1890
   446
                synchronized (this) {               // 1st sub registration
jaroslav@1890
   447
                    if (state == s) {               // recheck under lock
jaroslav@1890
   448
                        parent.doRegister(1);
jaroslav@1890
   449
                        do {                        // force current phase
jaroslav@1890
   450
                            phase = (int)(root.state >>> PHASE_SHIFT);
jaroslav@1890
   451
                            // assert phase < 0 || (int)state == EMPTY;
jaroslav@1895
   452
                        } while (!compareAndSwapLong
jaroslav@1895
   453
                                 (state,
jaroslav@1890
   454
                                  ((long)phase << PHASE_SHIFT) | adj));
jaroslav@1890
   455
                        break;
jaroslav@1890
   456
                    }
jaroslav@1890
   457
                }
jaroslav@1890
   458
            }
jaroslav@1890
   459
        }
jaroslav@1890
   460
        return phase;
jaroslav@1890
   461
    }
jaroslav@1890
   462
jaroslav@1890
   463
    /**
jaroslav@1890
   464
     * Resolves lagged phase propagation from root if necessary.
jaroslav@1890
   465
     * Reconciliation normally occurs when root has advanced but
jaroslav@1890
   466
     * subphasers have not yet done so, in which case they must finish
jaroslav@1890
   467
     * their own advance by setting unarrived to parties (or if
jaroslav@1890
   468
     * parties is zero, resetting to unregistered EMPTY state).
jaroslav@1890
   469
     * However, this method may also be called when "floating"
jaroslav@1890
   470
     * subphasers with possibly some unarrived parties are merely
jaroslav@1890
   471
     * catching up to current phase, in which case counts are
jaroslav@1890
   472
     * unaffected.
jaroslav@1890
   473
     *
jaroslav@1890
   474
     * @return reconciled state
jaroslav@1890
   475
     */
jaroslav@1890
   476
    private long reconcileState() {
jaroslav@1890
   477
        final Phaser root = this.root;
jaroslav@1890
   478
        long s = state;
jaroslav@1890
   479
        if (root != this) {
jaroslav@1890
   480
            int phase, u, p;
jaroslav@1890
   481
            // CAS root phase with current parties; possibly trip unarrived
jaroslav@1890
   482
            while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
jaroslav@1890
   483
                   (int)(s >>> PHASE_SHIFT) &&
jaroslav@1895
   484
                   !compareAndSwapLong
jaroslav@1895
   485
                   (s,
jaroslav@1890
   486
                    s = (((long)phase << PHASE_SHIFT) |
jaroslav@1890
   487
                         (s & PARTIES_MASK) |
jaroslav@1890
   488
                         ((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
jaroslav@1890
   489
                          (u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
jaroslav@1890
   490
                s = state;
jaroslav@1890
   491
        }
jaroslav@1890
   492
        return s;
jaroslav@1890
   493
    }
jaroslav@1890
   494
jaroslav@1890
   495
    /**
jaroslav@1890
   496
     * Creates a new phaser with no initially registered parties, no
jaroslav@1890
   497
     * parent, and initial phase number 0. Any thread using this
jaroslav@1890
   498
     * phaser will need to first register for it.
jaroslav@1890
   499
     */
jaroslav@1890
   500
    public Phaser() {
jaroslav@1890
   501
        this(null, 0);
jaroslav@1890
   502
    }
jaroslav@1890
   503
jaroslav@1890
   504
    /**
jaroslav@1890
   505
     * Creates a new phaser with the given number of registered
jaroslav@1890
   506
     * unarrived parties, no parent, and initial phase number 0.
jaroslav@1890
   507
     *
jaroslav@1890
   508
     * @param parties the number of parties required to advance to the
jaroslav@1890
   509
     * next phase
jaroslav@1890
   510
     * @throws IllegalArgumentException if parties less than zero
jaroslav@1890
   511
     * or greater than the maximum number of parties supported
jaroslav@1890
   512
     */
jaroslav@1890
   513
    public Phaser(int parties) {
jaroslav@1890
   514
        this(null, parties);
jaroslav@1890
   515
    }
jaroslav@1890
   516
jaroslav@1890
   517
    /**
jaroslav@1890
   518
     * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
jaroslav@1890
   519
     *
jaroslav@1890
   520
     * @param parent the parent phaser
jaroslav@1890
   521
     */
jaroslav@1890
   522
    public Phaser(Phaser parent) {
jaroslav@1890
   523
        this(parent, 0);
jaroslav@1890
   524
    }
jaroslav@1890
   525
jaroslav@1890
   526
    /**
jaroslav@1890
   527
     * Creates a new phaser with the given parent and number of
jaroslav@1890
   528
     * registered unarrived parties.  When the given parent is non-null
jaroslav@1890
   529
     * and the given number of parties is greater than zero, this
jaroslav@1890
   530
     * child phaser is registered with its parent.
jaroslav@1890
   531
     *
jaroslav@1890
   532
     * @param parent the parent phaser
jaroslav@1890
   533
     * @param parties the number of parties required to advance to the
jaroslav@1890
   534
     * next phase
jaroslav@1890
   535
     * @throws IllegalArgumentException if parties less than zero
jaroslav@1890
   536
     * or greater than the maximum number of parties supported
jaroslav@1890
   537
     */
jaroslav@1890
   538
    public Phaser(Phaser parent, int parties) {
jaroslav@1890
   539
        if (parties >>> PARTIES_SHIFT != 0)
jaroslav@1890
   540
            throw new IllegalArgumentException("Illegal number of parties");
jaroslav@1890
   541
        int phase = 0;
jaroslav@1890
   542
        this.parent = parent;
jaroslav@1890
   543
        if (parent != null) {
jaroslav@1890
   544
            final Phaser root = parent.root;
jaroslav@1890
   545
            this.root = root;
jaroslav@1890
   546
            this.evenQ = root.evenQ;
jaroslav@1890
   547
            this.oddQ = root.oddQ;
jaroslav@1890
   548
            if (parties != 0)
jaroslav@1890
   549
                phase = parent.doRegister(1);
jaroslav@1890
   550
        }
jaroslav@1890
   551
        else {
jaroslav@1890
   552
            this.root = this;
jaroslav@1890
   553
            this.evenQ = new AtomicReference<QNode>();
jaroslav@1890
   554
            this.oddQ = new AtomicReference<QNode>();
jaroslav@1890
   555
        }
jaroslav@1890
   556
        this.state = (parties == 0) ? (long)EMPTY :
jaroslav@1890
   557
            ((long)phase << PHASE_SHIFT) |
jaroslav@1890
   558
            ((long)parties << PARTIES_SHIFT) |
jaroslav@1890
   559
            ((long)parties);
jaroslav@1890
   560
    }
jaroslav@1890
   561
jaroslav@1890
   562
    /**
jaroslav@1890
   563
     * Adds a new unarrived party to this phaser.  If an ongoing
jaroslav@1890
   564
     * invocation of {@link #onAdvance} is in progress, this method
jaroslav@1890
   565
     * may await its completion before returning.  If this phaser has
jaroslav@1890
   566
     * a parent, and this phaser previously had no registered parties,
jaroslav@1890
   567
     * this child phaser is also registered with its parent. If
jaroslav@1890
   568
     * this phaser is terminated, the attempt to register has
jaroslav@1890
   569
     * no effect, and a negative value is returned.
jaroslav@1890
   570
     *
jaroslav@1890
   571
     * @return the arrival phase number to which this registration
jaroslav@1890
   572
     * applied.  If this value is negative, then this phaser has
jaroslav@1890
   573
     * terminated, in which case registration has no effect.
jaroslav@1890
   574
     * @throws IllegalStateException if attempting to register more
jaroslav@1890
   575
     * than the maximum supported number of parties
jaroslav@1890
   576
     */
jaroslav@1890
   577
    public int register() {
jaroslav@1890
   578
        return doRegister(1);
jaroslav@1890
   579
    }
jaroslav@1890
   580
jaroslav@1890
   581
    /**
jaroslav@1890
   582
     * Adds the given number of new unarrived parties to this phaser.
jaroslav@1890
   583
     * If an ongoing invocation of {@link #onAdvance} is in progress,
jaroslav@1890
   584
     * this method may await its completion before returning.  If this
jaroslav@1890
   585
     * phaser has a parent, and the given number of parties is greater
jaroslav@1890
   586
     * than zero, and this phaser previously had no registered
jaroslav@1890
   587
     * parties, this child phaser is also registered with its parent.
jaroslav@1890
   588
     * If this phaser is terminated, the attempt to register has no
jaroslav@1890
   589
     * effect, and a negative value is returned.
jaroslav@1890
   590
     *
jaroslav@1890
   591
     * @param parties the number of additional parties required to
jaroslav@1890
   592
     * advance to the next phase
jaroslav@1890
   593
     * @return the arrival phase number to which this registration
jaroslav@1890
   594
     * applied.  If this value is negative, then this phaser has
jaroslav@1890
   595
     * terminated, in which case registration has no effect.
jaroslav@1890
   596
     * @throws IllegalStateException if attempting to register more
jaroslav@1890
   597
     * than the maximum supported number of parties
jaroslav@1890
   598
     * @throws IllegalArgumentException if {@code parties < 0}
jaroslav@1890
   599
     */
jaroslav@1890
   600
    public int bulkRegister(int parties) {
jaroslav@1890
   601
        if (parties < 0)
jaroslav@1890
   602
            throw new IllegalArgumentException();
jaroslav@1890
   603
        if (parties == 0)
jaroslav@1890
   604
            return getPhase();
jaroslav@1890
   605
        return doRegister(parties);
jaroslav@1890
   606
    }
jaroslav@1890
   607
jaroslav@1890
   608
    /**
jaroslav@1890
   609
     * Arrives at this phaser, without waiting for others to arrive.
jaroslav@1890
   610
     *
jaroslav@1890
   611
     * <p>It is a usage error for an unregistered party to invoke this
jaroslav@1890
   612
     * method.  However, this error may result in an {@code
jaroslav@1890
   613
     * IllegalStateException} only upon some subsequent operation on
jaroslav@1890
   614
     * this phaser, if ever.
jaroslav@1890
   615
     *
jaroslav@1890
   616
     * @return the arrival phase number, or a negative value if terminated
jaroslav@1890
   617
     * @throws IllegalStateException if not terminated and the number
jaroslav@1890
   618
     * of unarrived parties would become negative
jaroslav@1890
   619
     */
jaroslav@1890
   620
    public int arrive() {
jaroslav@1890
   621
        return doArrive(false);
jaroslav@1890
   622
    }
jaroslav@1890
   623
jaroslav@1890
   624
    /**
jaroslav@1890
   625
     * Arrives at this phaser and deregisters from it without waiting
jaroslav@1890
   626
     * for others to arrive. Deregistration reduces the number of
jaroslav@1890
   627
     * parties required to advance in future phases.  If this phaser
jaroslav@1890
   628
     * has a parent, and deregistration causes this phaser to have
jaroslav@1890
   629
     * zero parties, this phaser is also deregistered from its parent.
jaroslav@1890
   630
     *
jaroslav@1890
   631
     * <p>It is a usage error for an unregistered party to invoke this
jaroslav@1890
   632
     * method.  However, this error may result in an {@code
jaroslav@1890
   633
     * IllegalStateException} only upon some subsequent operation on
jaroslav@1890
   634
     * this phaser, if ever.
jaroslav@1890
   635
     *
jaroslav@1890
   636
     * @return the arrival phase number, or a negative value if terminated
jaroslav@1890
   637
     * @throws IllegalStateException if not terminated and the number
jaroslav@1890
   638
     * of registered or unarrived parties would become negative
jaroslav@1890
   639
     */
jaroslav@1890
   640
    public int arriveAndDeregister() {
jaroslav@1890
   641
        return doArrive(true);
jaroslav@1890
   642
    }
jaroslav@1890
   643
jaroslav@1890
   644
    /**
jaroslav@1890
   645
     * Arrives at this phaser and awaits others. Equivalent in effect
jaroslav@1890
   646
     * to {@code awaitAdvance(arrive())}.  If you need to await with
jaroslav@1890
   647
     * interruption or timeout, you can arrange this with an analogous
jaroslav@1890
   648
     * construction using one of the other forms of the {@code
jaroslav@1890
   649
     * awaitAdvance} method.  If instead you need to deregister upon
jaroslav@1890
   650
     * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
jaroslav@1890
   651
     *
jaroslav@1890
   652
     * <p>It is a usage error for an unregistered party to invoke this
jaroslav@1890
   653
     * method.  However, this error may result in an {@code
jaroslav@1890
   654
     * IllegalStateException} only upon some subsequent operation on
jaroslav@1890
   655
     * this phaser, if ever.
jaroslav@1890
   656
     *
jaroslav@1890
   657
     * @return the arrival phase number, or the (negative)
jaroslav@1890
   658
     * {@linkplain #getPhase() current phase} if terminated
jaroslav@1890
   659
     * @throws IllegalStateException if not terminated and the number
jaroslav@1890
   660
     * of unarrived parties would become negative
jaroslav@1890
   661
     */
jaroslav@1890
   662
    public int arriveAndAwaitAdvance() {
jaroslav@1890
   663
        // Specialization of doArrive+awaitAdvance eliminating some reads/paths
jaroslav@1890
   664
        final Phaser root = this.root;
jaroslav@1890
   665
        for (;;) {
jaroslav@1890
   666
            long s = (root == this) ? state : reconcileState();
jaroslav@1890
   667
            int phase = (int)(s >>> PHASE_SHIFT);
jaroslav@1890
   668
            int counts = (int)s;
jaroslav@1890
   669
            int unarrived = (counts & UNARRIVED_MASK) - 1;
jaroslav@1890
   670
            if (phase < 0)
jaroslav@1890
   671
                return phase;
jaroslav@1890
   672
            else if (counts == EMPTY || unarrived < 0) {
jaroslav@1890
   673
                if (reconcileState() == s)
jaroslav@1890
   674
                    throw new IllegalStateException(badArrive(s));
jaroslav@1890
   675
            }
jaroslav@1895
   676
            else if (compareAndSwapLong(s,
jaroslav@1890
   677
                                               s -= ONE_ARRIVAL)) {
jaroslav@1890
   678
                if (unarrived != 0)
jaroslav@1890
   679
                    return root.internalAwaitAdvance(phase, null);
jaroslav@1890
   680
                if (root != this)
jaroslav@1890
   681
                    return parent.arriveAndAwaitAdvance();
jaroslav@1890
   682
                long n = s & PARTIES_MASK;  // base of next state
jaroslav@1890
   683
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
jaroslav@1890
   684
                if (onAdvance(phase, nextUnarrived))
jaroslav@1890
   685
                    n |= TERMINATION_BIT;
jaroslav@1890
   686
                else if (nextUnarrived == 0)
jaroslav@1890
   687
                    n |= EMPTY;
jaroslav@1890
   688
                else
jaroslav@1890
   689
                    n |= nextUnarrived;
jaroslav@1890
   690
                int nextPhase = (phase + 1) & MAX_PHASE;
jaroslav@1890
   691
                n |= (long)nextPhase << PHASE_SHIFT;
jaroslav@1895
   692
                if (!compareAndSwapLong(s, n))
jaroslav@1890
   693
                    return (int)(state >>> PHASE_SHIFT); // terminated
jaroslav@1890
   694
                releaseWaiters(phase);
jaroslav@1890
   695
                return nextPhase;
jaroslav@1890
   696
            }
jaroslav@1890
   697
        }
jaroslav@1890
   698
    }
jaroslav@1890
   699
jaroslav@1890
   700
    /**
jaroslav@1890
   701
     * Awaits the phase of this phaser to advance from the given phase
jaroslav@1890
   702
     * value, returning immediately if the current phase is not equal
jaroslav@1890
   703
     * to the given phase value or this phaser is terminated.
jaroslav@1890
   704
     *
jaroslav@1890
   705
     * @param phase an arrival phase number, or negative value if
jaroslav@1890
   706
     * terminated; this argument is normally the value returned by a
jaroslav@1890
   707
     * previous call to {@code arrive} or {@code arriveAndDeregister}.
jaroslav@1890
   708
     * @return the next arrival phase number, or the argument if it is
jaroslav@1890
   709
     * negative, or the (negative) {@linkplain #getPhase() current phase}
jaroslav@1890
   710
     * if terminated
jaroslav@1890
   711
     */
jaroslav@1890
   712
    public int awaitAdvance(int phase) {
jaroslav@1890
   713
        final Phaser root = this.root;
jaroslav@1890
   714
        long s = (root == this) ? state : reconcileState();
jaroslav@1890
   715
        int p = (int)(s >>> PHASE_SHIFT);
jaroslav@1890
   716
        if (phase < 0)
jaroslav@1890
   717
            return phase;
jaroslav@1890
   718
        if (p == phase)
jaroslav@1890
   719
            return root.internalAwaitAdvance(phase, null);
jaroslav@1890
   720
        return p;
jaroslav@1890
   721
    }
jaroslav@1890
   722
jaroslav@1890
   723
    /**
jaroslav@1890
   724
     * Awaits the phase of this phaser to advance from the given phase
jaroslav@1890
   725
     * value, throwing {@code InterruptedException} if interrupted
jaroslav@1890
   726
     * while waiting, or returning immediately if the current phase is
jaroslav@1890
   727
     * not equal to the given phase value or this phaser is
jaroslav@1890
   728
     * terminated.
jaroslav@1890
   729
     *
jaroslav@1890
   730
     * @param phase an arrival phase number, or negative value if
jaroslav@1890
   731
     * terminated; this argument is normally the value returned by a
jaroslav@1890
   732
     * previous call to {@code arrive} or {@code arriveAndDeregister}.
jaroslav@1890
   733
     * @return the next arrival phase number, or the argument if it is
jaroslav@1890
   734
     * negative, or the (negative) {@linkplain #getPhase() current phase}
jaroslav@1890
   735
     * if terminated
jaroslav@1890
   736
     * @throws InterruptedException if thread interrupted while waiting
jaroslav@1890
   737
     */
jaroslav@1890
   738
    public int awaitAdvanceInterruptibly(int phase)
jaroslav@1890
   739
        throws InterruptedException {
jaroslav@1890
   740
        final Phaser root = this.root;
jaroslav@1890
   741
        long s = (root == this) ? state : reconcileState();
jaroslav@1890
   742
        int p = (int)(s >>> PHASE_SHIFT);
jaroslav@1890
   743
        if (phase < 0)
jaroslav@1890
   744
            return phase;
jaroslav@1890
   745
        if (p == phase) {
jaroslav@1890
   746
            QNode node = new QNode(this, phase, true, false, 0L);
jaroslav@1890
   747
            p = root.internalAwaitAdvance(phase, node);
jaroslav@1890
   748
            if (node.wasInterrupted)
jaroslav@1890
   749
                throw new InterruptedException();
jaroslav@1890
   750
        }
jaroslav@1890
   751
        return p;
jaroslav@1890
   752
    }
jaroslav@1890
   753
jaroslav@1890
   754
    /**
jaroslav@1890
   755
     * Awaits the phase of this phaser to advance from the given phase
jaroslav@1890
   756
     * value or the given timeout to elapse, throwing {@code
jaroslav@1890
   757
     * InterruptedException} if interrupted while waiting, or
jaroslav@1890
   758
     * returning immediately if the current phase is not equal to the
jaroslav@1890
   759
     * given phase value or this phaser is terminated.
jaroslav@1890
   760
     *
jaroslav@1890
   761
     * @param phase an arrival phase number, or negative value if
jaroslav@1890
   762
     * terminated; this argument is normally the value returned by a
jaroslav@1890
   763
     * previous call to {@code arrive} or {@code arriveAndDeregister}.
jaroslav@1890
   764
     * @param timeout how long to wait before giving up, in units of
jaroslav@1890
   765
     *        {@code unit}
jaroslav@1890
   766
     * @param unit a {@code TimeUnit} determining how to interpret the
jaroslav@1890
   767
     *        {@code timeout} parameter
jaroslav@1890
   768
     * @return the next arrival phase number, or the argument if it is
jaroslav@1890
   769
     * negative, or the (negative) {@linkplain #getPhase() current phase}
jaroslav@1890
   770
     * if terminated
jaroslav@1890
   771
     * @throws InterruptedException if thread interrupted while waiting
jaroslav@1890
   772
     * @throws TimeoutException if timed out while waiting
jaroslav@1890
   773
     */
jaroslav@1890
   774
    public int awaitAdvanceInterruptibly(int phase,
jaroslav@1890
   775
                                         long timeout, TimeUnit unit)
jaroslav@1890
   776
        throws InterruptedException, TimeoutException {
jaroslav@1890
   777
        long nanos = unit.toNanos(timeout);
jaroslav@1890
   778
        final Phaser root = this.root;
jaroslav@1890
   779
        long s = (root == this) ? state : reconcileState();
jaroslav@1890
   780
        int p = (int)(s >>> PHASE_SHIFT);
jaroslav@1890
   781
        if (phase < 0)
jaroslav@1890
   782
            return phase;
jaroslav@1890
   783
        if (p == phase) {
jaroslav@1890
   784
            QNode node = new QNode(this, phase, true, true, nanos);
jaroslav@1890
   785
            p = root.internalAwaitAdvance(phase, node);
jaroslav@1890
   786
            if (node.wasInterrupted)
jaroslav@1890
   787
                throw new InterruptedException();
jaroslav@1890
   788
            else if (p == phase)
jaroslav@1890
   789
                throw new TimeoutException();
jaroslav@1890
   790
        }
jaroslav@1890
   791
        return p;
jaroslav@1890
   792
    }
jaroslav@1890
   793
jaroslav@1890
   794
    /**
jaroslav@1890
   795
     * Forces this phaser to enter termination state.  Counts of
jaroslav@1890
   796
     * registered parties are unaffected.  If this phaser is a member
jaroslav@1890
   797
     * of a tiered set of phasers, then all of the phasers in the set
jaroslav@1890
   798
     * are terminated.  If this phaser is already terminated, this
jaroslav@1890
   799
     * method has no effect.  This method may be useful for
jaroslav@1890
   800
     * coordinating recovery after one or more tasks encounter
jaroslav@1890
   801
     * unexpected exceptions.
jaroslav@1890
   802
     */
jaroslav@1890
   803
    public void forceTermination() {
jaroslav@1890
   804
        // Only need to change root state
jaroslav@1890
   805
        final Phaser root = this.root;
jaroslav@1890
   806
        long s;
jaroslav@1890
   807
        while ((s = root.state) >= 0) {
jaroslav@1895
   808
            if (compareAndSwapLong(                                          s, s | TERMINATION_BIT)) {
jaroslav@1890
   809
                // signal all threads
jaroslav@1890
   810
                releaseWaiters(0);
jaroslav@1890
   811
                releaseWaiters(1);
jaroslav@1890
   812
                return;
jaroslav@1890
   813
            }
jaroslav@1890
   814
        }
jaroslav@1890
   815
    }
jaroslav@1890
   816
jaroslav@1890
   817
    /**
jaroslav@1890
   818
     * Returns the current phase number. The maximum phase number is
jaroslav@1890
   819
     * {@code Integer.MAX_VALUE}, after which it restarts at
jaroslav@1890
   820
     * zero. Upon termination, the phase number is negative,
jaroslav@1890
   821
     * in which case the prevailing phase prior to termination
jaroslav@1890
   822
     * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
jaroslav@1890
   823
     *
jaroslav@1890
   824
     * @return the phase number, or a negative value if terminated
jaroslav@1890
   825
     */
jaroslav@1890
   826
    public final int getPhase() {
jaroslav@1890
   827
        return (int)(root.state >>> PHASE_SHIFT);
jaroslav@1890
   828
    }
jaroslav@1890
   829
jaroslav@1890
   830
    /**
jaroslav@1890
   831
     * Returns the number of parties registered at this phaser.
jaroslav@1890
   832
     *
jaroslav@1890
   833
     * @return the number of parties
jaroslav@1890
   834
     */
jaroslav@1890
   835
    public int getRegisteredParties() {
jaroslav@1890
   836
        return partiesOf(state);
jaroslav@1890
   837
    }
jaroslav@1890
   838
jaroslav@1890
   839
    /**
jaroslav@1890
   840
     * Returns the number of registered parties that have arrived at
jaroslav@1890
   841
     * the current phase of this phaser. If this phaser has terminated,
jaroslav@1890
   842
     * the returned value is meaningless and arbitrary.
jaroslav@1890
   843
     *
jaroslav@1890
   844
     * @return the number of arrived parties
jaroslav@1890
   845
     */
jaroslav@1890
   846
    public int getArrivedParties() {
jaroslav@1890
   847
        return arrivedOf(reconcileState());
jaroslav@1890
   848
    }
jaroslav@1890
   849
jaroslav@1890
   850
    /**
jaroslav@1890
   851
     * Returns the number of registered parties that have not yet
jaroslav@1890
   852
     * arrived at the current phase of this phaser. If this phaser has
jaroslav@1890
   853
     * terminated, the returned value is meaningless and arbitrary.
jaroslav@1890
   854
     *
jaroslav@1890
   855
     * @return the number of unarrived parties
jaroslav@1890
   856
     */
jaroslav@1890
   857
    public int getUnarrivedParties() {
jaroslav@1890
   858
        return unarrivedOf(reconcileState());
jaroslav@1890
   859
    }
jaroslav@1890
   860
jaroslav@1890
   861
    /**
jaroslav@1890
   862
     * Returns the parent of this phaser, or {@code null} if none.
jaroslav@1890
   863
     *
jaroslav@1890
   864
     * @return the parent of this phaser, or {@code null} if none
jaroslav@1890
   865
     */
jaroslav@1890
   866
    public Phaser getParent() {
jaroslav@1890
   867
        return parent;
jaroslav@1890
   868
    }
jaroslav@1890
   869
jaroslav@1890
   870
    /**
jaroslav@1890
   871
     * Returns the root ancestor of this phaser, which is the same as
jaroslav@1890
   872
     * this phaser if it has no parent.
jaroslav@1890
   873
     *
jaroslav@1890
   874
     * @return the root ancestor of this phaser
jaroslav@1890
   875
     */
jaroslav@1890
   876
    public Phaser getRoot() {
jaroslav@1890
   877
        return root;
jaroslav@1890
   878
    }
jaroslav@1890
   879
jaroslav@1890
   880
    /**
jaroslav@1890
   881
     * Returns {@code true} if this phaser has been terminated.
jaroslav@1890
   882
     *
jaroslav@1890
   883
     * @return {@code true} if this phaser has been terminated
jaroslav@1890
   884
     */
jaroslav@1890
   885
    public boolean isTerminated() {
jaroslav@1890
   886
        return root.state < 0L;
jaroslav@1890
   887
    }
jaroslav@1890
   888
jaroslav@1890
   889
    /**
jaroslav@1890
   890
     * Overridable method to perform an action upon impending phase
jaroslav@1890
   891
     * advance, and to control termination. This method is invoked
jaroslav@1890
   892
     * upon arrival of the party advancing this phaser (when all other
jaroslav@1890
   893
     * waiting parties are dormant).  If this method returns {@code
jaroslav@1890
   894
     * true}, this phaser will be set to a final termination state
jaroslav@1890
   895
     * upon advance, and subsequent calls to {@link #isTerminated}
jaroslav@1890
   896
     * will return true. Any (unchecked) Exception or Error thrown by
jaroslav@1890
   897
     * an invocation of this method is propagated to the party
jaroslav@1890
   898
     * attempting to advance this phaser, in which case no advance
jaroslav@1890
   899
     * occurs.
jaroslav@1890
   900
     *
jaroslav@1890
   901
     * <p>The arguments to this method provide the state of the phaser
jaroslav@1890
   902
     * prevailing for the current transition.  The effects of invoking
jaroslav@1890
   903
     * arrival, registration, and waiting methods on this phaser from
jaroslav@1890
   904
     * within {@code onAdvance} are unspecified and should not be
jaroslav@1890
   905
     * relied on.
jaroslav@1890
   906
     *
jaroslav@1890
   907
     * <p>If this phaser is a member of a tiered set of phasers, then
jaroslav@1890
   908
     * {@code onAdvance} is invoked only for its root phaser on each
jaroslav@1890
   909
     * advance.
jaroslav@1890
   910
     *
jaroslav@1890
   911
     * <p>To support the most common use cases, the default
jaroslav@1890
   912
     * implementation of this method returns {@code true} when the
jaroslav@1890
   913
     * number of registered parties has become zero as the result of a
jaroslav@1890
   914
     * party invoking {@code arriveAndDeregister}.  You can disable
jaroslav@1890
   915
     * this behavior, thus enabling continuation upon future
jaroslav@1890
   916
     * registrations, by overriding this method to always return
jaroslav@1890
   917
     * {@code false}:
jaroslav@1890
   918
     *
jaroslav@1890
   919
     * <pre> {@code
jaroslav@1890
   920
     * Phaser phaser = new Phaser() {
jaroslav@1890
   921
     *   protected boolean onAdvance(int phase, int parties) { return false; }
jaroslav@1890
   922
     * }}</pre>
jaroslav@1890
   923
     *
jaroslav@1890
   924
     * @param phase the current phase number on entry to this method,
jaroslav@1890
   925
     * before this phaser is advanced
jaroslav@1890
   926
     * @param registeredParties the current number of registered parties
jaroslav@1890
   927
     * @return {@code true} if this phaser should terminate
jaroslav@1890
   928
     */
jaroslav@1890
   929
    protected boolean onAdvance(int phase, int registeredParties) {
jaroslav@1890
   930
        return registeredParties == 0;
jaroslav@1890
   931
    }
jaroslav@1890
   932
jaroslav@1890
   933
    /**
jaroslav@1890
   934
     * Returns a string identifying this phaser, as well as its
jaroslav@1890
   935
     * state.  The state, in brackets, includes the String {@code
jaroslav@1890
   936
     * "phase = "} followed by the phase number, {@code "parties = "}
jaroslav@1890
   937
     * followed by the number of registered parties, and {@code
jaroslav@1890
   938
     * "arrived = "} followed by the number of arrived parties.
jaroslav@1890
   939
     *
jaroslav@1890
   940
     * @return a string identifying this phaser, as well as its state
jaroslav@1890
   941
     */
jaroslav@1890
   942
    public String toString() {
jaroslav@1890
   943
        return stateToString(reconcileState());
jaroslav@1890
   944
    }
jaroslav@1890
   945
jaroslav@1890
   946
    /**
jaroslav@1890
   947
     * Implementation of toString and string-based error messages
jaroslav@1890
   948
     */
jaroslav@1890
   949
    private String stateToString(long s) {
jaroslav@1890
   950
        return super.toString() +
jaroslav@1890
   951
            "[phase = " + phaseOf(s) +
jaroslav@1890
   952
            " parties = " + partiesOf(s) +
jaroslav@1890
   953
            " arrived = " + arrivedOf(s) + "]";
jaroslav@1890
   954
    }
jaroslav@1890
   955
jaroslav@1890
   956
    // Waiting mechanics
jaroslav@1890
   957
jaroslav@1890
   958
    /**
jaroslav@1890
   959
     * Removes and signals threads from queue for phase.
jaroslav@1890
   960
     */
jaroslav@1890
   961
    private void releaseWaiters(int phase) {
jaroslav@1890
   962
        QNode q;   // first element of queue
jaroslav@1890
   963
        Thread t;  // its thread
jaroslav@1890
   964
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
jaroslav@1890
   965
        while ((q = head.get()) != null &&
jaroslav@1890
   966
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
jaroslav@1890
   967
            if (head.compareAndSet(q, q.next) &&
jaroslav@1890
   968
                (t = q.thread) != null) {
jaroslav@1890
   969
                q.thread = null;
jaroslav@1890
   970
                LockSupport.unpark(t);
jaroslav@1890
   971
            }
jaroslav@1890
   972
        }
jaroslav@1890
   973
    }
jaroslav@1890
   974
jaroslav@1890
   975
    /**
jaroslav@1890
   976
     * Variant of releaseWaiters that additionally tries to remove any
jaroslav@1890
   977
     * nodes no longer waiting for advance due to timeout or
jaroslav@1890
   978
     * interrupt. Currently, nodes are removed only if they are at
jaroslav@1890
   979
     * head of queue, which suffices to reduce memory footprint in
jaroslav@1890
   980
     * most usages.
jaroslav@1890
   981
     *
jaroslav@1890
   982
     * @return current phase on exit
jaroslav@1890
   983
     */
jaroslav@1890
   984
    private int abortWait(int phase) {
jaroslav@1890
   985
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
jaroslav@1890
   986
        for (;;) {
jaroslav@1890
   987
            Thread t;
jaroslav@1890
   988
            QNode q = head.get();
jaroslav@1890
   989
            int p = (int)(root.state >>> PHASE_SHIFT);
jaroslav@1890
   990
            if (q == null || ((t = q.thread) != null && q.phase == p))
jaroslav@1890
   991
                return p;
jaroslav@1890
   992
            if (head.compareAndSet(q, q.next) && t != null) {
jaroslav@1890
   993
                q.thread = null;
jaroslav@1890
   994
                LockSupport.unpark(t);
jaroslav@1890
   995
            }
jaroslav@1890
   996
        }
jaroslav@1890
   997
    }
jaroslav@1890
   998
jaroslav@1890
   999
    /** The number of CPUs, for spin control */
jaroslav@1895
  1000
    private static final int NCPU = 1;
jaroslav@1890
  1001
jaroslav@1890
  1002
    /**
jaroslav@1890
  1003
     * The number of times to spin before blocking while waiting for
jaroslav@1890
  1004
     * advance, per arrival while waiting. On multiprocessors, fully
jaroslav@1890
  1005
     * blocking and waking up a large number of threads all at once is
jaroslav@1890
  1006
     * usually a very slow process, so we use rechargeable spins to
jaroslav@1890
  1007
     * avoid it when threads regularly arrive: When a thread in
jaroslav@1890
  1008
     * internalAwaitAdvance notices another arrival before blocking,
jaroslav@1890
  1009
     * and there appear to be enough CPUs available, it spins
jaroslav@1890
  1010
     * SPINS_PER_ARRIVAL more times before blocking. The value trades
jaroslav@1890
  1011
     * off good-citizenship vs big unnecessary slowdowns.
jaroslav@1890
  1012
     */
jaroslav@1890
  1013
    static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
jaroslav@1890
  1014
jaroslav@1890
  1015
    /**
jaroslav@1890
  1016
     * Possibly blocks and waits for phase to advance unless aborted.
jaroslav@1890
  1017
     * Call only from root node.
jaroslav@1890
  1018
     *
jaroslav@1890
  1019
     * @param phase current phase
jaroslav@1890
  1020
     * @param node if non-null, the wait node to track interrupt and timeout;
jaroslav@1890
  1021
     * if null, denotes noninterruptible wait
jaroslav@1890
  1022
     * @return current phase
jaroslav@1890
  1023
     */
jaroslav@1890
  1024
    private int internalAwaitAdvance(int phase, QNode node) {
jaroslav@1890
  1025
        releaseWaiters(phase-1);          // ensure old queue clean
jaroslav@1890
  1026
        boolean queued = false;           // true when node is enqueued
jaroslav@1890
  1027
        int lastUnarrived = 0;            // to increase spins upon change
jaroslav@1890
  1028
        int spins = SPINS_PER_ARRIVAL;
jaroslav@1890
  1029
        long s;
jaroslav@1890
  1030
        int p;
jaroslav@1890
  1031
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
jaroslav@1890
  1032
            if (node == null) {           // spinning in noninterruptible mode
jaroslav@1890
  1033
                int unarrived = (int)s & UNARRIVED_MASK;
jaroslav@1890
  1034
                if (unarrived != lastUnarrived &&
jaroslav@1890
  1035
                    (lastUnarrived = unarrived) < NCPU)
jaroslav@1890
  1036
                    spins += SPINS_PER_ARRIVAL;
jaroslav@1890
  1037
                boolean interrupted = Thread.interrupted();
jaroslav@1890
  1038
                if (interrupted || --spins < 0) { // need node to record intr
jaroslav@1890
  1039
                    node = new QNode(this, phase, false, false, 0L);
jaroslav@1890
  1040
                    node.wasInterrupted = interrupted;
jaroslav@1890
  1041
                }
jaroslav@1890
  1042
            }
jaroslav@1890
  1043
            else if (node.isReleasable()) // done or aborted
jaroslav@1890
  1044
                break;
jaroslav@1890
  1045
            else if (!queued) {           // push onto queue
jaroslav@1890
  1046
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
jaroslav@1890
  1047
                QNode q = node.next = head.get();
jaroslav@1890
  1048
                if ((q == null || q.phase == phase) &&
jaroslav@1890
  1049
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
jaroslav@1890
  1050
                    queued = head.compareAndSet(q, node);
jaroslav@1890
  1051
            }
jaroslav@1890
  1052
            else {
jaroslav@1890
  1053
                try {
jaroslav@1890
  1054
                    ForkJoinPool.managedBlock(node);
jaroslav@1890
  1055
                } catch (InterruptedException ie) {
jaroslav@1890
  1056
                    node.wasInterrupted = true;
jaroslav@1890
  1057
                }
jaroslav@1890
  1058
            }
jaroslav@1890
  1059
        }
jaroslav@1890
  1060
jaroslav@1890
  1061
        if (node != null) {
jaroslav@1890
  1062
            if (node.thread != null)
jaroslav@1890
  1063
                node.thread = null;       // avoid need for unpark()
jaroslav@1890
  1064
            if (node.wasInterrupted && !node.interruptible)
jaroslav@1890
  1065
                Thread.currentThread().interrupt();
jaroslav@1890
  1066
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
jaroslav@1890
  1067
                return abortWait(phase); // possibly clean up on abort
jaroslav@1890
  1068
        }
jaroslav@1890
  1069
        releaseWaiters(phase);
jaroslav@1890
  1070
        return p;
jaroslav@1890
  1071
    }
jaroslav@1890
  1072
jaroslav@1895
  1073
    private boolean compareAndSwapLong(long s, long l) {
jaroslav@1895
  1074
        if (this.state == s) {
jaroslav@1895
  1075
            this.state = l;
jaroslav@1895
  1076
            return true;
jaroslav@1895
  1077
        }
jaroslav@1895
  1078
        return false;
jaroslav@1895
  1079
    }
jaroslav@1895
  1080
jaroslav@1890
  1081
    /**
jaroslav@1890
  1082
     * Wait nodes for Treiber stack representing wait queue
jaroslav@1890
  1083
     */
jaroslav@1890
  1084
    static final class QNode implements ForkJoinPool.ManagedBlocker {
jaroslav@1890
  1085
        final Phaser phaser;
jaroslav@1890
  1086
        final int phase;
jaroslav@1890
  1087
        final boolean interruptible;
jaroslav@1890
  1088
        final boolean timed;
jaroslav@1890
  1089
        boolean wasInterrupted;
jaroslav@1890
  1090
        long nanos;
jaroslav@1890
  1091
        long lastTime;
jaroslav@1890
  1092
        volatile Thread thread; // nulled to cancel wait
jaroslav@1890
  1093
        QNode next;
jaroslav@1890
  1094
jaroslav@1890
  1095
        QNode(Phaser phaser, int phase, boolean interruptible,
jaroslav@1890
  1096
              boolean timed, long nanos) {
jaroslav@1890
  1097
            this.phaser = phaser;
jaroslav@1890
  1098
            this.phase = phase;
jaroslav@1890
  1099
            this.interruptible = interruptible;
jaroslav@1890
  1100
            this.nanos = nanos;
jaroslav@1890
  1101
            this.timed = timed;
jaroslav@1890
  1102
            this.lastTime = timed ? System.nanoTime() : 0L;
jaroslav@1890
  1103
            thread = Thread.currentThread();
jaroslav@1890
  1104
        }
jaroslav@1890
  1105
jaroslav@1890
  1106
        public boolean isReleasable() {
jaroslav@1890
  1107
            if (thread == null)
jaroslav@1890
  1108
                return true;
jaroslav@1890
  1109
            if (phaser.getPhase() != phase) {
jaroslav@1890
  1110
                thread = null;
jaroslav@1890
  1111
                return true;
jaroslav@1890
  1112
            }
jaroslav@1890
  1113
            if (Thread.interrupted())
jaroslav@1890
  1114
                wasInterrupted = true;
jaroslav@1890
  1115
            if (wasInterrupted && interruptible) {
jaroslav@1890
  1116
                thread = null;
jaroslav@1890
  1117
                return true;
jaroslav@1890
  1118
            }
jaroslav@1890
  1119
            if (timed) {
jaroslav@1890
  1120
                if (nanos > 0L) {
jaroslav@1890
  1121
                    long now = System.nanoTime();
jaroslav@1890
  1122
                    nanos -= now - lastTime;
jaroslav@1890
  1123
                    lastTime = now;
jaroslav@1890
  1124
                }
jaroslav@1890
  1125
                if (nanos <= 0L) {
jaroslav@1890
  1126
                    thread = null;
jaroslav@1890
  1127
                    return true;
jaroslav@1890
  1128
                }
jaroslav@1890
  1129
            }
jaroslav@1890
  1130
            return false;
jaroslav@1890
  1131
        }
jaroslav@1890
  1132
jaroslav@1890
  1133
        public boolean block() {
jaroslav@1890
  1134
            if (isReleasable())
jaroslav@1890
  1135
                return true;
jaroslav@1890
  1136
            else if (!timed)
jaroslav@1890
  1137
                LockSupport.park(this);
jaroslav@1890
  1138
            else if (nanos > 0)
jaroslav@1890
  1139
                LockSupport.parkNanos(this, nanos);
jaroslav@1890
  1140
            return isReleasable();
jaroslav@1890
  1141
        }
jaroslav@1890
  1142
    }
jaroslav@1890
  1143
}