rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinPool.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
child 1895 bfaf3300b7ba
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.ArrayList;
    39 import java.util.Arrays;
    40 import java.util.Collection;
    41 import java.util.Collections;
    42 import java.util.List;
    43 import java.util.Random;
    44 import java.util.concurrent.AbstractExecutorService;
    45 import java.util.concurrent.Callable;
    46 import java.util.concurrent.ExecutorService;
    47 import java.util.concurrent.Future;
    48 import java.util.concurrent.RejectedExecutionException;
    49 import java.util.concurrent.RunnableFuture;
    50 import java.util.concurrent.TimeUnit;
    51 import java.util.concurrent.TimeoutException;
    52 import java.util.concurrent.atomic.AtomicInteger;
    53 import java.util.concurrent.locks.LockSupport;
    54 import java.util.concurrent.locks.ReentrantLock;
    55 import java.util.concurrent.locks.Condition;
    56 
    57 /**
    58  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
    59  * A {@code ForkJoinPool} provides the entry point for submissions
    60  * from non-{@code ForkJoinTask} clients, as well as management and
    61  * monitoring operations.
    62  *
    63  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
    64  * ExecutorService} mainly by virtue of employing
    65  * <em>work-stealing</em>: all threads in the pool attempt to find and
    66  * execute subtasks created by other active tasks (eventually blocking
    67  * waiting for work if none exist). This enables efficient processing
    68  * when most tasks spawn other subtasks (as do most {@code
    69  * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
    70  * constructors, {@code ForkJoinPool}s may also be appropriate for use
    71  * with event-style tasks that are never joined.
    72  *
    73  * <p>A {@code ForkJoinPool} is constructed with a given target
    74  * parallelism level; by default, equal to the number of available
    75  * processors. The pool attempts to maintain enough active (or
    76  * available) threads by dynamically adding, suspending, or resuming
    77  * internal worker threads, even if some tasks are stalled waiting to
    78  * join others. However, no such adjustments are guaranteed in the
    79  * face of blocked IO or other unmanaged synchronization. The nested
    80  * {@link ManagedBlocker} interface enables extension of the kinds of
    81  * synchronization accommodated.
    82  *
    83  * <p>In addition to execution and lifecycle control methods, this
    84  * class provides status check methods (for example
    85  * {@link #getStealCount}) that are intended to aid in developing,
    86  * tuning, and monitoring fork/join applications. Also, method
    87  * {@link #toString} returns indications of pool state in a
    88  * convenient form for informal monitoring.
    89  *
    90  * <p> As is the case with other ExecutorServices, there are three
    91  * main task execution methods summarized in the following
    92  * table. These are designed to be used by clients not already engaged
    93  * in fork/join computations in the current pool.  The main forms of
    94  * these methods accept instances of {@code ForkJoinTask}, but
    95  * overloaded forms also allow mixed execution of plain {@code
    96  * Runnable}- or {@code Callable}- based activities as well.  However,
    97  * tasks that are already executing in a pool should normally
    98  * <em>NOT</em> use these pool execution methods, but instead use the
    99  * within-computation forms listed in the table.
   100  *
   101  * <table BORDER CELLPADDING=3 CELLSPACING=1>
   102  *  <tr>
   103  *    <td></td>
   104  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
   105  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
   106  *  </tr>
   107  *  <tr>
   108  *    <td> <b>Arrange async execution</td>
   109  *    <td> {@link #execute(ForkJoinTask)}</td>
   110  *    <td> {@link ForkJoinTask#fork}</td>
   111  *  </tr>
   112  *  <tr>
   113  *    <td> <b>Await and obtain result</td>
   114  *    <td> {@link #invoke(ForkJoinTask)}</td>
   115  *    <td> {@link ForkJoinTask#invoke}</td>
   116  *  </tr>
   117  *  <tr>
   118  *    <td> <b>Arrange exec and obtain Future</td>
   119  *    <td> {@link #submit(ForkJoinTask)}</td>
   120  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
   121  *  </tr>
   122  * </table>
   123  *
   124  * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
   125  * used for all parallel task execution in a program or subsystem.
   126  * Otherwise, use would not usually outweigh the construction and
   127  * bookkeeping overhead of creating a large set of threads. For
   128  * example, a common pool could be used for the {@code SortTasks}
   129  * illustrated in {@link RecursiveAction}. Because {@code
   130  * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
   131  * daemon} mode, there is typically no need to explicitly {@link
   132  * #shutdown} such a pool upon program exit.
   133  *
   134  * <pre>
   135  * static final ForkJoinPool mainPool = new ForkJoinPool();
   136  * ...
   137  * public void sort(long[] array) {
   138  *   mainPool.invoke(new SortTask(array, 0, array.length));
   139  * }
   140  * </pre>
   141  *
   142  * <p><b>Implementation notes</b>: This implementation restricts the
   143  * maximum number of running threads to 32767. Attempts to create
   144  * pools with greater than the maximum number result in
   145  * {@code IllegalArgumentException}.
   146  *
   147  * <p>This implementation rejects submitted tasks (that is, by throwing
   148  * {@link RejectedExecutionException}) only when the pool is shut down
   149  * or internal resources have been exhausted.
   150  *
   151  * @since 1.7
   152  * @author Doug Lea
   153  */
   154 public class ForkJoinPool extends AbstractExecutorService {
   155 
   156     /*
   157      * Implementation Overview
   158      *
   159      * This class provides the central bookkeeping and control for a
   160      * set of worker threads: Submissions from non-FJ threads enter
   161      * into a submission queue. Workers take these tasks and typically
   162      * split them into subtasks that may be stolen by other workers.
   163      * Preference rules give first priority to processing tasks from
   164      * their own queues (LIFO or FIFO, depending on mode), then to
   165      * randomized FIFO steals of tasks in other worker queues, and
   166      * lastly to new submissions.
   167      *
   168      * The main throughput advantages of work-stealing stem from
   169      * decentralized control -- workers mostly take tasks from
   170      * themselves or each other. We cannot negate this in the
   171      * implementation of other management responsibilities. The main
   172      * tactic for avoiding bottlenecks is packing nearly all
   173      * essentially atomic control state into a single 64bit volatile
   174      * variable ("ctl"). This variable is read on the order of 10-100
   175      * times as often as it is modified (always via CAS). (There is
   176      * some additional control state, for example variable "shutdown"
   177      * for which we can cope with uncoordinated updates.)  This
   178      * streamlines synchronization and control at the expense of messy
   179      * constructions needed to repack status bits upon updates.
   180      * Updates tend not to contend with each other except during
   181      * bursts while submitted tasks begin or end.  In some cases when
   182      * they do contend, threads can instead do something else
   183      * (usually, scan for tasks) until contention subsides.
   184      *
   185      * To enable packing, we restrict maximum parallelism to (1<<15)-1
   186      * (which is far in excess of normal operating range) to allow
   187      * ids, counts, and their negations (used for thresholding) to fit
   188      * into 16bit fields.
   189      *
   190      * Recording Workers.  Workers are recorded in the "workers" array
   191      * that is created upon pool construction and expanded if (rarely)
   192      * necessary.  This is an array as opposed to some other data
   193      * structure to support index-based random steals by workers.
   194      * Updates to the array recording new workers and unrecording
   195      * terminated ones are protected from each other by a seqLock
   196      * (scanGuard) but the array is otherwise concurrently readable,
   197      * and accessed directly by workers. To simplify index-based
   198      * operations, the array size is always a power of two, and all
   199      * readers must tolerate null slots. To avoid flailing during
   200      * start-up, the array is presized to hold twice #parallelism
   201      * workers (which is unlikely to need further resizing during
   202      * execution). But to avoid dealing with so many null slots,
   203      * variable scanGuard includes a mask for the nearest power of two
   204      * that contains all current workers.  All worker thread creation
   205      * is on-demand, triggered by task submissions, replacement of
   206      * terminated workers, and/or compensation for blocked
   207      * workers. However, all other support code is set up to work with
   208      * other policies.  To ensure that we do not hold on to worker
   209      * references that would prevent GC, ALL accesses to workers are
   210      * via indices into the workers array (which is one source of some
   211      * of the messy code constructions here). In essence, the workers
   212      * array serves as a weak reference mechanism. Thus for example
   213      * the wait queue field of ctl stores worker indices, not worker
   214      * references.  Access to the workers in associated methods (for
   215      * example signalWork) must both index-check and null-check the
   216      * IDs. All such accesses ignore bad IDs by returning out early
   217      * from what they are doing, since this can only be associated
   218      * with termination, in which case it is OK to give up.
   219      *
   220      * All uses of the workers array, as well as queue arrays, check
   221      * that the array is non-null (even if previously non-null). This
   222      * allows nulling during termination, which is currently not
   223      * necessary, but remains an option for resource-revocation-based
   224      * shutdown schemes.
   225      *
   226      * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
   227      * let workers spin indefinitely scanning for tasks when none can
   228      * be found immediately, and we cannot start/resume workers unless
   229      * there appear to be tasks available.  On the other hand, we must
   230      * quickly prod them into action when new tasks are submitted or
   231      * generated.  We park/unpark workers after placing in an event
   232      * wait queue when they cannot find work. This "queue" is actually
   233      * a simple Treiber stack, headed by the "id" field of ctl, plus a
   234      * 15bit counter value to both wake up waiters (by advancing their
   235      * count) and avoid ABA effects. Successors are held in worker
   236      * field "nextWait".  Queuing deals with several intrinsic races,
   237      * mainly that a task-producing thread can miss seeing (and
   238      * signalling) another thread that gave up looking for work but
   239      * has not yet entered the wait queue. We solve this by requiring
   240      * a full sweep of all workers both before (in scan()) and after
   241      * (in tryAwaitWork()) a newly waiting worker is added to the wait
   242      * queue. During a rescan, the worker might release some other
   243      * queued worker rather than itself, which has the same net
   244      * effect. Because enqueued workers may actually be rescanning
   245      * rather than waiting, we set and clear the "parked" field of
   246      * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
   247      * (Use of the parked field requires a secondary recheck to avoid
   248      * missed signals.)
   249      *
   250      * Signalling.  We create or wake up workers only when there
   251      * appears to be at least one task they might be able to find and
   252      * execute.  When a submission is added or another worker adds a
   253      * task to a queue that previously had two or fewer tasks, they
   254      * signal waiting workers (or trigger creation of new ones if
   255      * fewer than the given parallelism level -- see signalWork).
   256      * These primary signals are buttressed by signals during rescans
   257      * as well as those performed when a worker steals a task and
   258      * notices that there are more tasks too; together these cover the
   259      * signals needed in cases when more than two tasks are pushed
   260      * but untaken.
   261      *
   262      * Trimming workers. To release resources after periods of lack of
   263      * use, a worker starting to wait when the pool is quiescent will
   264      * time out and terminate if the pool has remained quiescent for
   265      * SHRINK_RATE nanosecs. This will slowly propagate, eventually
   266      * terminating all workers after long periods of non-use.
   267      *
   268      * Submissions. External submissions are maintained in an
   269      * array-based queue that is structured identically to
   270      * ForkJoinWorkerThread queues except for the use of
   271      * submissionLock in method addSubmission. Unlike the case for
   272      * worker queues, multiple external threads can add new
   273      * submissions, so adding requires a lock.
   274      *
   275      * Compensation. Beyond work-stealing support and lifecycle
   276      * control, the main responsibility of this framework is to take
   277      * actions when one worker is waiting to join a task stolen (or
   278      * always held by) another.  Because we are multiplexing many
   279      * tasks on to a pool of workers, we can't just let them block (as
   280      * in Thread.join).  We also cannot just reassign the joiner's
   281      * run-time stack with another and replace it later, which would
   282      * be a form of "continuation", that even if possible is not
   283      * necessarily a good idea since we sometimes need both an
   284      * unblocked task and its continuation to progress. Instead we
   285      * combine two tactics:
   286      *
   287      *   Helping: Arranging for the joiner to execute some task that it
   288      *      would be running if the steal had not occurred.  Method
   289      *      ForkJoinWorkerThread.joinTask tracks joining->stealing
   290      *      links to try to find such a task.
   291      *
   292      *   Compensating: Unless there are already enough live threads,
   293      *      method tryPreBlock() may create or re-activate a spare
   294      *      thread to compensate for blocked joiners until they
   295      *      unblock.
   296      *
   297      * The ManagedBlocker extension API can't use helping so relies
   298      * only on compensation in method awaitBlocker.
   299      *
   300      * It is impossible to keep exactly the target parallelism number
   301      * of threads running at any given time.  Determining the
   302      * existence of conservatively safe helping targets, the
   303      * availability of already-created spares, and the apparent need
   304      * to create new spares are all racy and require heuristic
   305      * guidance, so we rely on multiple retries of each.  Currently,
   306      * in keeping with on-demand signalling policy, we compensate only
   307      * if blocking would leave less than one active (non-waiting,
   308      * non-blocked) worker. Additionally, to avoid some false alarms
   309      * due to GC, lagging counters, system activity, etc, compensated
   310      * blocking for joins is only attempted after rechecks stabilize
   311      * (retries are interspersed with Thread.yield, for good
   312      * citizenship).  The variable blockedCount, incremented before
   313      * blocking and decremented after, is sometimes needed to
   314      * distinguish cases of waiting for work vs blocking on joins or
   315      * other managed sync. Both cases are equivalent for most pool
   316      * control, so we can update non-atomically. (Additionally,
   317      * contention on blockedCount alleviates some contention on ctl).
   318      *
   319      * Shutdown and Termination. A call to shutdownNow atomically sets
   320      * the ctl stop bit and then (non-atomically) sets each workers
   321      * "terminate" status, cancels all unprocessed tasks, and wakes up
   322      * all waiting workers.  Detecting whether termination should
   323      * commence after a non-abrupt shutdown() call requires more work
   324      * and bookkeeping. We need consensus about quiesence (i.e., that
   325      * there is no more work) which is reflected in active counts so
   326      * long as there are no current blockers, as well as possible
   327      * re-evaluations during independent changes in blocking or
   328      * quiescing workers.
   329      *
   330      * Style notes: There is a lot of representation-level coupling
   331      * among classes ForkJoinPool, ForkJoinWorkerThread, and
   332      * ForkJoinTask.  Most fields of ForkJoinWorkerThread maintain
   333      * data structures managed by ForkJoinPool, so are directly
   334      * accessed.  Conversely we allow access to "workers" array by
   335      * workers, and direct access to ForkJoinTask.status by both
   336      * ForkJoinPool and ForkJoinWorkerThread.  There is little point
   337      * trying to reduce this, since any associated future changes in
   338      * representations will need to be accompanied by algorithmic
   339      * changes anyway. All together, these low-level implementation
   340      * choices produce as much as a factor of 4 performance
   341      * improvement compared to naive implementations, and enable the
   342      * processing of billions of tasks per second, at the expense of
   343      * some ugliness.
   344      *
   345      * Methods signalWork() and scan() are the main bottlenecks so are
   346      * especially heavily micro-optimized/mangled.  There are lots of
   347      * inline assignments (of form "while ((local = field) != 0)")
   348      * which are usually the simplest way to ensure the required read
   349      * orderings (which are sometimes critical). This leads to a
   350      * "C"-like style of listing declarations of these locals at the
   351      * heads of methods or blocks.  There are several occurrences of
   352      * the unusual "do {} while (!cas...)"  which is the simplest way
   353      * to force an update of a CAS'ed variable. There are also other
   354      * coding oddities that help some methods perform reasonably even
   355      * when interpreted (not compiled).
   356      *
   357      * The order of declarations in this file is: (1) declarations of
   358      * statics (2) fields (along with constants used when unpacking
   359      * some of them), listed in an order that tends to reduce
   360      * contention among them a bit under most JVMs.  (3) internal
   361      * control methods (4) callbacks and other support for
   362      * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
   363      * methods (plus a few little helpers). (6) static block
   364      * initializing all statics in a minimally dependent order.
   365      */
   366 
   367     /**
   368      * Factory for creating new {@link ForkJoinWorkerThread}s.
   369      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
   370      * for {@code ForkJoinWorkerThread} subclasses that extend base
   371      * functionality or initialize threads with different contexts.
   372      */
   373     public static interface ForkJoinWorkerThreadFactory {
   374         /**
   375          * Returns a new worker thread operating in the given pool.
   376          *
   377          * @param pool the pool this thread works in
   378          * @throws NullPointerException if the pool is null
   379          */
   380         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
   381     }
   382 
   383     /**
   384      * Default ForkJoinWorkerThreadFactory implementation; creates a
   385      * new ForkJoinWorkerThread.
   386      */
   387     static class DefaultForkJoinWorkerThreadFactory
   388         implements ForkJoinWorkerThreadFactory {
   389         public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
   390             return new ForkJoinWorkerThread(pool);
   391         }
   392     }
   393 
   394     /**
   395      * Creates a new ForkJoinWorkerThread. This factory is used unless
   396      * overridden in ForkJoinPool constructors.
   397      */
   398     public static final ForkJoinWorkerThreadFactory
   399         defaultForkJoinWorkerThreadFactory;
   400 
   401     /**
   402      * Permission required for callers of methods that may start or
   403      * kill threads.
   404      */
   405     private static final RuntimePermission modifyThreadPermission;
   406 
   407     /**
   408      * If there is a security manager, makes sure caller has
   409      * permission to modify threads.
   410      */
   411     private static void checkPermission() {
   412         SecurityManager security = System.getSecurityManager();
   413         if (security != null)
   414             security.checkPermission(modifyThreadPermission);
   415     }
   416 
   417     /**
   418      * Generator for assigning sequence numbers as pool names.
   419      */
   420     private static final AtomicInteger poolNumberGenerator;
   421 
   422     /**
   423      * Generator for initial random seeds for worker victim
   424      * selection. This is used only to create initial seeds. Random
   425      * steals use a cheaper xorshift generator per steal attempt. We
   426      * don't expect much contention on seedGenerator, so just use a
   427      * plain Random.
   428      */
   429     static final Random workerSeedGenerator;
   430 
   431     /**
   432      * Array holding all worker threads in the pool.  Initialized upon
   433      * construction. Array size must be a power of two.  Updates and
   434      * replacements are protected by scanGuard, but the array is
   435      * always kept in a consistent enough state to be randomly
   436      * accessed without locking by workers performing work-stealing,
   437      * as well as other traversal-based methods in this class, so long
   438      * as reads memory-acquire by first reading ctl. All readers must
   439      * tolerate that some array slots may be null.
   440      */
   441     ForkJoinWorkerThread[] workers;
   442 
   443     /**
   444      * Initial size for submission queue array. Must be a power of
   445      * two.  In many applications, these always stay small so we use a
   446      * small initial cap.
   447      */
   448     private static final int INITIAL_QUEUE_CAPACITY = 8;
   449 
   450     /**
   451      * Maximum size for submission queue array. Must be a power of two
   452      * less than or equal to 1 << (31 - width of array entry) to
   453      * ensure lack of index wraparound, but is capped at a lower
   454      * value to help users trap runaway computations.
   455      */
   456     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
   457 
   458     /**
   459      * Array serving as submission queue. Initialized upon construction.
   460      */
   461     private ForkJoinTask<?>[] submissionQueue;
   462 
   463     /**
   464      * Lock protecting submissions array for addSubmission
   465      */
   466     private final ReentrantLock submissionLock;
   467 
   468     /**
   469      * Condition for awaitTermination, using submissionLock for
   470      * convenience.
   471      */
   472     private final Condition termination;
   473 
   474     /**
   475      * Creation factory for worker threads.
   476      */
   477     private final ForkJoinWorkerThreadFactory factory;
   478 
   479     /**
   480      * The uncaught exception handler used when any worker abruptly
   481      * terminates.
   482      */
   483     final Thread.UncaughtExceptionHandler ueh;
   484 
   485     /**
   486      * Prefix for assigning names to worker threads
   487      */
   488     private final String workerNamePrefix;
   489 
   490     /**
   491      * Sum of per-thread steal counts, updated only when threads are
   492      * idle or terminating.
   493      */
   494     private volatile long stealCount;
   495 
   496     /**
   497      * Main pool control -- a long packed with:
   498      * AC: Number of active running workers minus target parallelism (16 bits)
   499      * TC: Number of total workers minus target parallelism (16bits)
   500      * ST: true if pool is terminating (1 bit)
   501      * EC: the wait count of top waiting thread (15 bits)
   502      * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
   503      *
   504      * When convenient, we can extract the upper 32 bits of counts and
   505      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
   506      * (int)ctl.  The ec field is never accessed alone, but always
   507      * together with id and st. The offsets of counts by the target
   508      * parallelism and the positionings of fields makes it possible to
   509      * perform the most common checks via sign tests of fields: When
   510      * ac is negative, there are not enough active workers, when tc is
   511      * negative, there are not enough total workers, when id is
   512      * negative, there is at least one waiting worker, and when e is
   513      * negative, the pool is terminating.  To deal with these possibly
   514      * negative fields, we use casts in and out of "short" and/or
   515      * signed shifts to maintain signedness.
   516      */
   517     volatile long ctl;
   518 
   519     // bit positions/shifts for fields
   520     private static final int  AC_SHIFT   = 48;
   521     private static final int  TC_SHIFT   = 32;
   522     private static final int  ST_SHIFT   = 31;
   523     private static final int  EC_SHIFT   = 16;
   524 
   525     // bounds
   526     private static final int  MAX_ID     = 0x7fff;  // max poolIndex
   527     private static final int  SMASK      = 0xffff;  // mask short bits
   528     private static final int  SHORT_SIGN = 1 << 15;
   529     private static final int  INT_SIGN   = 1 << 31;
   530 
   531     // masks
   532     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
   533     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
   534     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
   535 
   536     // units for incrementing and decrementing
   537     private static final long TC_UNIT    = 1L << TC_SHIFT;
   538     private static final long AC_UNIT    = 1L << AC_SHIFT;
   539 
   540     // masks and units for dealing with u = (int)(ctl >>> 32)
   541     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
   542     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
   543     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
   544     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
   545     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
   546     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
   547 
   548     // masks and units for dealing with e = (int)ctl
   549     private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
   550     private static final int  EC_UNIT    = 1 << EC_SHIFT;
   551 
   552     /**
   553      * The target parallelism level.
   554      */
   555     final int parallelism;
   556 
   557     /**
   558      * Index (mod submission queue length) of next element to take
   559      * from submission queue. Usage is identical to that for
   560      * per-worker queues -- see ForkJoinWorkerThread internal
   561      * documentation.
   562      */
   563     volatile int queueBase;
   564 
   565     /**
   566      * Index (mod submission queue length) of next element to add
   567      * in submission queue. Usage is identical to that for
   568      * per-worker queues -- see ForkJoinWorkerThread internal
   569      * documentation.
   570      */
   571     int queueTop;
   572 
   573     /**
   574      * True when shutdown() has been called.
   575      */
   576     volatile boolean shutdown;
   577 
   578     /**
   579      * True if use local fifo, not default lifo, for local polling
   580      * Read by, and replicated by ForkJoinWorkerThreads
   581      */
   582     final boolean locallyFifo;
   583 
   584     /**
   585      * The number of threads in ForkJoinWorkerThreads.helpQuiescePool.
   586      * When non-zero, suppresses automatic shutdown when active
   587      * counts become zero.
   588      */
   589     volatile int quiescerCount;
   590 
   591     /**
   592      * The number of threads blocked in join.
   593      */
   594     volatile int blockedCount;
   595 
   596     /**
   597      * Counter for worker Thread names (unrelated to their poolIndex)
   598      */
   599     private volatile int nextWorkerNumber;
   600 
   601     /**
   602      * The index for the next created worker. Accessed under scanGuard.
   603      */
   604     private int nextWorkerIndex;
   605 
   606     /**
   607      * SeqLock and index masking for updates to workers array.  Locked
   608      * when SG_UNIT is set. Unlocking clears bit by adding
   609      * SG_UNIT. Staleness of read-only operations can be checked by
   610      * comparing scanGuard to value before the reads. The low 16 bits
   611      * (i.e, anding with SMASK) hold (the smallest power of two
   612      * covering all worker indices, minus one, and is used to avoid
   613      * dealing with large numbers of null slots when the workers array
   614      * is overallocated.
   615      */
   616     volatile int scanGuard;
   617 
   618     private static final int SG_UNIT = 1 << 16;
   619 
   620     /**
   621      * The wakeup interval (in nanoseconds) for a worker waiting for a
   622      * task when the pool is quiescent to instead try to shrink the
   623      * number of workers.  The exact value does not matter too
   624      * much. It must be short enough to release resources during
   625      * sustained periods of idleness, but not so short that threads
   626      * are continually re-created.
   627      */
   628     private static final long SHRINK_RATE =
   629         4L * 1000L * 1000L * 1000L; // 4 seconds
   630 
   631     /**
   632      * Top-level loop for worker threads: On each step: if the
   633      * previous step swept through all queues and found no tasks, or
   634      * there are excess threads, then possibly blocks. Otherwise,
   635      * scans for and, if found, executes a task. Returns when pool
   636      * and/or worker terminate.
   637      *
   638      * @param w the worker
   639      */
   640     final void work(ForkJoinWorkerThread w) {
   641         boolean swept = false;                // true on empty scans
   642         long c;
   643         while (!w.terminate && (int)(c = ctl) >= 0) {
   644             int a;                            // active count
   645             if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
   646                 swept = scan(w, a);
   647             else if (tryAwaitWork(w, c))
   648                 swept = false;
   649         }
   650     }
   651 
   652     // Signalling
   653 
   654     /**
   655      * Wakes up or creates a worker.
   656      */
   657     final void signalWork() {
   658         /*
   659          * The while condition is true if: (there is are too few total
   660          * workers OR there is at least one waiter) AND (there are too
   661          * few active workers OR the pool is terminating).  The value
   662          * of e distinguishes the remaining cases: zero (no waiters)
   663          * for create, negative if terminating (in which case do
   664          * nothing), else release a waiter. The secondary checks for
   665          * release (non-null array etc) can fail if the pool begins
   666          * terminating after the test, and don't impose any added cost
   667          * because JVMs must perform null and bounds checks anyway.
   668          */
   669         long c; int e, u;
   670         while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
   671                 (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
   672             if (e > 0) {                         // release a waiting worker
   673                 int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
   674                 if ((ws = workers) == null ||
   675                     (i = ~e & SMASK) >= ws.length ||
   676                     (w = ws[i]) == null)
   677                     break;
   678                 long nc = (((long)(w.nextWait & E_MASK)) |
   679                            ((long)(u + UAC_UNIT) << 32));
   680                 if (w.eventCount == e &&
   681                     UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   682                     w.eventCount = (e + EC_UNIT) & E_MASK;
   683                     if (w.parked)
   684                         UNSAFE.unpark(w);
   685                     break;
   686                 }
   687             }
   688             else if (UNSAFE.compareAndSwapLong
   689                      (this, ctlOffset, c,
   690                       (long)(((u + UTC_UNIT) & UTC_MASK) |
   691                              ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
   692                 addWorker();
   693                 break;
   694             }
   695         }
   696     }
   697 
   698     /**
   699      * Variant of signalWork to help release waiters on rescans.
   700      * Tries once to release a waiter if active count < 0.
   701      *
   702      * @return false if failed due to contention, else true
   703      */
   704     private boolean tryReleaseWaiter() {
   705         long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
   706         if ((e = (int)(c = ctl)) > 0 &&
   707             (int)(c >> AC_SHIFT) < 0 &&
   708             (ws = workers) != null &&
   709             (i = ~e & SMASK) < ws.length &&
   710             (w = ws[i]) != null) {
   711             long nc = ((long)(w.nextWait & E_MASK) |
   712                        ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
   713             if (w.eventCount != e ||
   714                 !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
   715                 return false;
   716             w.eventCount = (e + EC_UNIT) & E_MASK;
   717             if (w.parked)
   718                 UNSAFE.unpark(w);
   719         }
   720         return true;
   721     }
   722 
   723     // Scanning for tasks
   724 
   725     /**
   726      * Scans for and, if found, executes one task. Scans start at a
   727      * random index of workers array, and randomly select the first
   728      * (2*#workers)-1 probes, and then, if all empty, resort to 2
   729      * circular sweeps, which is necessary to check quiescence. and
   730      * taking a submission only if no stealable tasks were found.  The
   731      * steal code inside the loop is a specialized form of
   732      * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
   733      * helpJoinTask and signal propagation. The code for submission
   734      * queues is almost identical. On each steal, the worker completes
   735      * not only the task, but also all local tasks that this task may
   736      * have generated. On detecting staleness or contention when
   737      * trying to take a task, this method returns without finishing
   738      * sweep, which allows global state rechecks before retry.
   739      *
   740      * @param w the worker
   741      * @param a the number of active workers
   742      * @return true if swept all queues without finding a task
   743      */
   744     private boolean scan(ForkJoinWorkerThread w, int a) {
   745         int g = scanGuard; // mask 0 avoids useless scans if only one active
   746         int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
   747         ForkJoinWorkerThread[] ws = workers;
   748         if (ws == null || ws.length <= m)         // staleness check
   749             return false;
   750         for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
   751             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   752             ForkJoinWorkerThread v = ws[k & m];
   753             if (v != null && (b = v.queueBase) != v.queueTop &&
   754                 (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
   755                 long u = (i << ASHIFT) + ABASE;
   756                 if ((t = q[i]) != null && v.queueBase == b &&
   757                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
   758                     int d = (v.queueBase = b + 1) - v.queueTop;
   759                     v.stealHint = w.poolIndex;
   760                     if (d != 0)
   761                         signalWork();             // propagate if nonempty
   762                     w.execTask(t);
   763                 }
   764                 r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
   765                 return false;                     // store next seed
   766             }
   767             else if (j < 0) {                     // xorshift
   768                 r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
   769             }
   770             else
   771                 ++k;
   772         }
   773         if (scanGuard != g)                       // staleness check
   774             return false;
   775         else {                                    // try to take submission
   776             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   777             if ((b = queueBase) != queueTop &&
   778                 (q = submissionQueue) != null &&
   779                 (i = (q.length - 1) & b) >= 0) {
   780                 long u = (i << ASHIFT) + ABASE;
   781                 if ((t = q[i]) != null && queueBase == b &&
   782                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
   783                     queueBase = b + 1;
   784                     w.execTask(t);
   785                 }
   786                 return false;
   787             }
   788             return true;                         // all queues empty
   789         }
   790     }
   791 
   792     /**
   793      * Tries to enqueue worker w in wait queue and await change in
   794      * worker's eventCount.  If the pool is quiescent and there is
   795      * more than one worker, possibly terminates worker upon exit.
   796      * Otherwise, before blocking, rescans queues to avoid missed
   797      * signals.  Upon finding work, releases at least one worker
   798      * (which may be the current worker). Rescans restart upon
   799      * detected staleness or failure to release due to
   800      * contention. Note the unusual conventions about Thread.interrupt
   801      * here and elsewhere: Because interrupts are used solely to alert
   802      * threads to check termination, which is checked here anyway, we
   803      * clear status (using Thread.interrupted) before any call to
   804      * park, so that park does not immediately return due to status
   805      * being set via some other unrelated call to interrupt in user
   806      * code.
   807      *
   808      * @param w the calling worker
   809      * @param c the ctl value on entry
   810      * @return true if waited or another thread was released upon enq
   811      */
   812     private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
   813         int v = w.eventCount;
   814         w.nextWait = (int)c;                      // w's successor record
   815         long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   816         if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   817             long d = ctl; // return true if lost to a deq, to force scan
   818             return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
   819         }
   820         for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
   821             long s = stealCount;
   822             if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
   823                 sc = w.stealCount = 0;
   824             else if (w.eventCount != v)
   825                 return true;                      // update next time
   826         }
   827         if ((!shutdown || !tryTerminate(false)) &&
   828             (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
   829             blockedCount == 0 && quiescerCount == 0)
   830             idleAwaitWork(w, nc, c, v);           // quiescent
   831         for (boolean rescanned = false;;) {
   832             if (w.eventCount != v)
   833                 return true;
   834             if (!rescanned) {
   835                 int g = scanGuard, m = g & SMASK;
   836                 ForkJoinWorkerThread[] ws = workers;
   837                 if (ws != null && m < ws.length) {
   838                     rescanned = true;
   839                     for (int i = 0; i <= m; ++i) {
   840                         ForkJoinWorkerThread u = ws[i];
   841                         if (u != null) {
   842                             if (u.queueBase != u.queueTop &&
   843                                 !tryReleaseWaiter())
   844                                 rescanned = false; // contended
   845                             if (w.eventCount != v)
   846                                 return true;
   847                         }
   848                     }
   849                 }
   850                 if (scanGuard != g ||              // stale
   851                     (queueBase != queueTop && !tryReleaseWaiter()))
   852                     rescanned = false;
   853                 if (!rescanned)
   854                     Thread.yield();                // reduce contention
   855                 else
   856                     Thread.interrupted();          // clear before park
   857             }
   858             else {
   859                 w.parked = true;                   // must recheck
   860                 if (w.eventCount != v) {
   861                     w.parked = false;
   862                     return true;
   863                 }
   864                 LockSupport.park(this);
   865                 rescanned = w.parked = false;
   866             }
   867         }
   868     }
   869 
   870     /**
   871      * If inactivating worker w has caused pool to become
   872      * quiescent, check for pool termination, and wait for event
   873      * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
   874      * this case because quiescence reflects consensus about lack
   875      * of work). On timeout, if ctl has not changed, terminate the
   876      * worker. Upon its termination (see deregisterWorker), it may
   877      * wake up another worker to possibly repeat this process.
   878      *
   879      * @param w the calling worker
   880      * @param currentCtl the ctl value after enqueuing w
   881      * @param prevCtl the ctl value if w terminated
   882      * @param v the eventCount w awaits change
   883      */
   884     private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
   885                                long prevCtl, int v) {
   886         if (w.eventCount == v) {
   887             if (shutdown)
   888                 tryTerminate(false);
   889             ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
   890             while (ctl == currentCtl) {
   891                 long startTime = System.nanoTime();
   892                 w.parked = true;
   893                 if (w.eventCount == v)             // must recheck
   894                     LockSupport.parkNanos(this, SHRINK_RATE);
   895                 w.parked = false;
   896                 if (w.eventCount != v)
   897                     break;
   898                 else if (System.nanoTime() - startTime <
   899                          SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
   900                     Thread.interrupted();          // spurious wakeup
   901                 else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
   902                                                    currentCtl, prevCtl)) {
   903                     w.terminate = true;            // restore previous
   904                     w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
   905                     break;
   906                 }
   907             }
   908         }
   909     }
   910 
   911     // Submissions
   912 
   913     /**
   914      * Enqueues the given task in the submissionQueue.  Same idea as
   915      * ForkJoinWorkerThread.pushTask except for use of submissionLock.
   916      *
   917      * @param t the task
   918      */
   919     private void addSubmission(ForkJoinTask<?> t) {
   920         final ReentrantLock lock = this.submissionLock;
   921         lock.lock();
   922         try {
   923             ForkJoinTask<?>[] q; int s, m;
   924             if ((q = submissionQueue) != null) {    // ignore if queue removed
   925                 long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
   926                 UNSAFE.putOrderedObject(q, u, t);
   927                 queueTop = s + 1;
   928                 if (s - queueBase == m)
   929                     growSubmissionQueue();
   930             }
   931         } finally {
   932             lock.unlock();
   933         }
   934         signalWork();
   935     }
   936 
   937     //  (pollSubmission is defined below with exported methods)
   938 
   939     /**
   940      * Creates or doubles submissionQueue array.
   941      * Basically identical to ForkJoinWorkerThread version.
   942      */
   943     private void growSubmissionQueue() {
   944         ForkJoinTask<?>[] oldQ = submissionQueue;
   945         int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
   946         if (size > MAXIMUM_QUEUE_CAPACITY)
   947             throw new RejectedExecutionException("Queue capacity exceeded");
   948         if (size < INITIAL_QUEUE_CAPACITY)
   949             size = INITIAL_QUEUE_CAPACITY;
   950         ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
   951         int mask = size - 1;
   952         int top = queueTop;
   953         int oldMask;
   954         if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
   955             for (int b = queueBase; b != top; ++b) {
   956                 long u = ((b & oldMask) << ASHIFT) + ABASE;
   957                 Object x = UNSAFE.getObjectVolatile(oldQ, u);
   958                 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
   959                     UNSAFE.putObjectVolatile
   960                         (q, ((b & mask) << ASHIFT) + ABASE, x);
   961             }
   962         }
   963     }
   964 
   965     // Blocking support
   966 
   967     /**
   968      * Tries to increment blockedCount, decrement active count
   969      * (sometimes implicitly) and possibly release or create a
   970      * compensating worker in preparation for blocking. Fails
   971      * on contention or termination.
   972      *
   973      * @return true if the caller can block, else should recheck and retry
   974      */
   975     private boolean tryPreBlock() {
   976         int b = blockedCount;
   977         if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
   978             int pc = parallelism;
   979             do {
   980                 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
   981                 int e, ac, tc, rc, i;
   982                 long c = ctl;
   983                 int u = (int)(c >>> 32);
   984                 if ((e = (int)c) < 0) {
   985                                                  // skip -- terminating
   986                 }
   987                 else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
   988                          (ws = workers) != null &&
   989                          (i = ~e & SMASK) < ws.length &&
   990                          (w = ws[i]) != null) {
   991                     long nc = ((long)(w.nextWait & E_MASK) |
   992                                (c & (AC_MASK|TC_MASK)));
   993                     if (w.eventCount == e &&
   994                         UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   995                         w.eventCount = (e + EC_UNIT) & E_MASK;
   996                         if (w.parked)
   997                             UNSAFE.unpark(w);
   998                         return true;             // release an idle worker
   999                     }
  1000                 }
  1001                 else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
  1002                     long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
  1003                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
  1004                         return true;             // no compensation needed
  1005                 }
  1006                 else if (tc + pc < MAX_ID) {
  1007                     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
  1008                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
  1009                         addWorker();
  1010                         return true;            // create a replacement
  1011                     }
  1012                 }
  1013                 // try to back out on any failure and let caller retry
  1014             } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
  1015                                                b = blockedCount, b - 1));
  1016         }
  1017         return false;
  1018     }
  1019 
  1020     /**
  1021      * Decrements blockedCount and increments active count
  1022      */
  1023     private void postBlock() {
  1024         long c;
  1025         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
  1026                                                 c = ctl, c + AC_UNIT));
  1027         int b;
  1028         do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
  1029                                                b = blockedCount, b - 1));
  1030     }
  1031 
  1032     /**
  1033      * Possibly blocks waiting for the given task to complete, or
  1034      * cancels the task if terminating.  Fails to wait if contended.
  1035      *
  1036      * @param joinMe the task
  1037      */
  1038     final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
  1039         int s;
  1040         Thread.interrupted(); // clear interrupts before checking termination
  1041         if (joinMe.status >= 0) {
  1042             if (tryPreBlock()) {
  1043                 joinMe.tryAwaitDone(0L);
  1044                 postBlock();
  1045             }
  1046             else if ((ctl & STOP_BIT) != 0L)
  1047                 joinMe.cancelIgnoringExceptions();
  1048         }
  1049     }
  1050 
  1051     /**
  1052      * Possibly blocks the given worker waiting for joinMe to
  1053      * complete or timeout
  1054      *
  1055      * @param joinMe the task
  1056      * @param millis the wait time for underlying Object.wait
  1057      */
  1058     final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
  1059         while (joinMe.status >= 0) {
  1060             Thread.interrupted();
  1061             if ((ctl & STOP_BIT) != 0L) {
  1062                 joinMe.cancelIgnoringExceptions();
  1063                 break;
  1064             }
  1065             if (tryPreBlock()) {
  1066                 long last = System.nanoTime();
  1067                 while (joinMe.status >= 0) {
  1068                     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
  1069                     if (millis <= 0)
  1070                         break;
  1071                     joinMe.tryAwaitDone(millis);
  1072                     if (joinMe.status < 0)
  1073                         break;
  1074                     if ((ctl & STOP_BIT) != 0L) {
  1075                         joinMe.cancelIgnoringExceptions();
  1076                         break;
  1077                     }
  1078                     long now = System.nanoTime();
  1079                     nanos -= now - last;
  1080                     last = now;
  1081                 }
  1082                 postBlock();
  1083                 break;
  1084             }
  1085         }
  1086     }
  1087 
  1088     /**
  1089      * If necessary, compensates for blocker, and blocks
  1090      */
  1091     private void awaitBlocker(ManagedBlocker blocker)
  1092         throws InterruptedException {
  1093         while (!blocker.isReleasable()) {
  1094             if (tryPreBlock()) {
  1095                 try {
  1096                     do {} while (!blocker.isReleasable() && !blocker.block());
  1097                 } finally {
  1098                     postBlock();
  1099                 }
  1100                 break;
  1101             }
  1102         }
  1103     }
  1104 
  1105     // Creating, registering and deregistring workers
  1106 
  1107     /**
  1108      * Tries to create and start a worker; minimally rolls back counts
  1109      * on failure.
  1110      */
  1111     private void addWorker() {
  1112         Throwable ex = null;
  1113         ForkJoinWorkerThread t = null;
  1114         try {
  1115             t = factory.newThread(this);
  1116         } catch (Throwable e) {
  1117             ex = e;
  1118         }
  1119         if (t == null) {  // null or exceptional factory return
  1120             long c;       // adjust counts
  1121             do {} while (!UNSAFE.compareAndSwapLong
  1122                          (this, ctlOffset, c = ctl,
  1123                           (((c - AC_UNIT) & AC_MASK) |
  1124                            ((c - TC_UNIT) & TC_MASK) |
  1125                            (c & ~(AC_MASK|TC_MASK)))));
  1126             // Propagate exception if originating from an external caller
  1127             if (!tryTerminate(false) && ex != null &&
  1128                 !(Thread.currentThread() instanceof ForkJoinWorkerThread))
  1129                 UNSAFE.throwException(ex);
  1130         }
  1131         else
  1132             t.start();
  1133     }
  1134 
  1135     /**
  1136      * Callback from ForkJoinWorkerThread constructor to assign a
  1137      * public name
  1138      */
  1139     final String nextWorkerName() {
  1140         for (int n;;) {
  1141             if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
  1142                                          n = nextWorkerNumber, ++n))
  1143                 return workerNamePrefix + n;
  1144         }
  1145     }
  1146 
  1147     /**
  1148      * Callback from ForkJoinWorkerThread constructor to
  1149      * determine its poolIndex and record in workers array.
  1150      *
  1151      * @param w the worker
  1152      * @return the worker's pool index
  1153      */
  1154     final int registerWorker(ForkJoinWorkerThread w) {
  1155         /*
  1156          * In the typical case, a new worker acquires the lock, uses
  1157          * next available index and returns quickly.  Since we should
  1158          * not block callers (ultimately from signalWork or
  1159          * tryPreBlock) waiting for the lock needed to do this, we
  1160          * instead help release other workers while waiting for the
  1161          * lock.
  1162          */
  1163         for (int g;;) {
  1164             ForkJoinWorkerThread[] ws;
  1165             if (((g = scanGuard) & SG_UNIT) == 0 &&
  1166                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
  1167                                          g, g | SG_UNIT)) {
  1168                 int k = nextWorkerIndex;
  1169                 try {
  1170                     if ((ws = workers) != null) { // ignore on shutdown
  1171                         int n = ws.length;
  1172                         if (k < 0 || k >= n || ws[k] != null) {
  1173                             for (k = 0; k < n && ws[k] != null; ++k)
  1174                                 ;
  1175                             if (k == n)
  1176                                 ws = workers = Arrays.copyOf(ws, n << 1);
  1177                         }
  1178                         ws[k] = w;
  1179                         nextWorkerIndex = k + 1;
  1180                         int m = g & SMASK;
  1181                         g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
  1182                     }
  1183                 } finally {
  1184                     scanGuard = g;
  1185                 }
  1186                 return k;
  1187             }
  1188             else if ((ws = workers) != null) { // help release others
  1189                 for (ForkJoinWorkerThread u : ws) {
  1190                     if (u != null && u.queueBase != u.queueTop) {
  1191                         if (tryReleaseWaiter())
  1192                             break;
  1193                     }
  1194                 }
  1195             }
  1196         }
  1197     }
  1198 
  1199     /**
  1200      * Final callback from terminating worker.  Removes record of
  1201      * worker from array, and adjusts counts. If pool is shutting
  1202      * down, tries to complete termination.
  1203      *
  1204      * @param w the worker
  1205      */
  1206     final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
  1207         int idx = w.poolIndex;
  1208         int sc = w.stealCount;
  1209         int steps = 0;
  1210         // Remove from array, adjust worker counts and collect steal count.
  1211         // We can intermix failed removes or adjusts with steal updates
  1212         do {
  1213             long s, c;
  1214             int g;
  1215             if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
  1216                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
  1217                                          g, g |= SG_UNIT)) {
  1218                 ForkJoinWorkerThread[] ws = workers;
  1219                 if (ws != null && idx >= 0 &&
  1220                     idx < ws.length && ws[idx] == w)
  1221                     ws[idx] = null;    // verify
  1222                 nextWorkerIndex = idx;
  1223                 scanGuard = g + SG_UNIT;
  1224                 steps = 1;
  1225             }
  1226             if (steps == 1 &&
  1227                 UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
  1228                                           (((c - AC_UNIT) & AC_MASK) |
  1229                                            ((c - TC_UNIT) & TC_MASK) |
  1230                                            (c & ~(AC_MASK|TC_MASK)))))
  1231                 steps = 2;
  1232             if (sc != 0 &&
  1233                 UNSAFE.compareAndSwapLong(this, stealCountOffset,
  1234                                           s = stealCount, s + sc))
  1235                 sc = 0;
  1236         } while (steps != 2 || sc != 0);
  1237         if (!tryTerminate(false)) {
  1238             if (ex != null)   // possibly replace if died abnormally
  1239                 signalWork();
  1240             else
  1241                 tryReleaseWaiter();
  1242         }
  1243     }
  1244 
  1245     // Shutdown and termination
  1246 
  1247     /**
  1248      * Possibly initiates and/or completes termination.
  1249      *
  1250      * @param now if true, unconditionally terminate, else only
  1251      * if shutdown and empty queue and no active workers
  1252      * @return true if now terminating or terminated
  1253      */
  1254     private boolean tryTerminate(boolean now) {
  1255         long c;
  1256         while (((c = ctl) & STOP_BIT) == 0) {
  1257             if (!now) {
  1258                 if ((int)(c >> AC_SHIFT) != -parallelism)
  1259                     return false;
  1260                 if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
  1261                     queueBase != queueTop) {
  1262                     if (ctl == c) // staleness check
  1263                         return false;
  1264                     continue;
  1265                 }
  1266             }
  1267             if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
  1268                 startTerminating();
  1269         }
  1270         if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
  1271             final ReentrantLock lock = this.submissionLock;
  1272             lock.lock();
  1273             try {
  1274                 termination.signalAll();
  1275             } finally {
  1276                 lock.unlock();
  1277             }
  1278         }
  1279         return true;
  1280     }
  1281 
  1282     /**
  1283      * Runs up to three passes through workers: (0) Setting
  1284      * termination status for each worker, followed by wakeups up to
  1285      * queued workers; (1) helping cancel tasks; (2) interrupting
  1286      * lagging threads (likely in external tasks, but possibly also
  1287      * blocked in joins).  Each pass repeats previous steps because of
  1288      * potential lagging thread creation.
  1289      */
  1290     private void startTerminating() {
  1291         cancelSubmissions();
  1292         for (int pass = 0; pass < 3; ++pass) {
  1293             ForkJoinWorkerThread[] ws = workers;
  1294             if (ws != null) {
  1295                 for (ForkJoinWorkerThread w : ws) {
  1296                     if (w != null) {
  1297                         w.terminate = true;
  1298                         if (pass > 0) {
  1299                             w.cancelTasks();
  1300                             if (pass > 1 && !w.isInterrupted()) {
  1301                                 try {
  1302                                     w.interrupt();
  1303                                 } catch (SecurityException ignore) {
  1304                                 }
  1305                             }
  1306                         }
  1307                     }
  1308                 }
  1309                 terminateWaiters();
  1310             }
  1311         }
  1312     }
  1313 
  1314     /**
  1315      * Polls and cancels all submissions. Called only during termination.
  1316      */
  1317     private void cancelSubmissions() {
  1318         while (queueBase != queueTop) {
  1319             ForkJoinTask<?> task = pollSubmission();
  1320             if (task != null) {
  1321                 try {
  1322                     task.cancel(false);
  1323                 } catch (Throwable ignore) {
  1324                 }
  1325             }
  1326         }
  1327     }
  1328 
  1329     /**
  1330      * Tries to set the termination status of waiting workers, and
  1331      * then wakes them up (after which they will terminate).
  1332      */
  1333     private void terminateWaiters() {
  1334         ForkJoinWorkerThread[] ws = workers;
  1335         if (ws != null) {
  1336             ForkJoinWorkerThread w; long c; int i, e;
  1337             int n = ws.length;
  1338             while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
  1339                    (w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
  1340                 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
  1341                                               (long)(w.nextWait & E_MASK) |
  1342                                               ((c + AC_UNIT) & AC_MASK) |
  1343                                               (c & (TC_MASK|STOP_BIT)))) {
  1344                     w.terminate = true;
  1345                     w.eventCount = e + EC_UNIT;
  1346                     if (w.parked)
  1347                         UNSAFE.unpark(w);
  1348                 }
  1349             }
  1350         }
  1351     }
  1352 
  1353     // misc ForkJoinWorkerThread support
  1354 
  1355     /**
  1356      * Increment or decrement quiescerCount. Needed only to prevent
  1357      * triggering shutdown if a worker is transiently inactive while
  1358      * checking quiescence.
  1359      *
  1360      * @param delta 1 for increment, -1 for decrement
  1361      */
  1362     final void addQuiescerCount(int delta) {
  1363         int c;
  1364         do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
  1365                                                c = quiescerCount, c + delta));
  1366     }
  1367 
  1368     /**
  1369      * Directly increment or decrement active count without
  1370      * queuing. This method is used to transiently assert inactivation
  1371      * while checking quiescence.
  1372      *
  1373      * @param delta 1 for increment, -1 for decrement
  1374      */
  1375     final void addActiveCount(int delta) {
  1376         long d = delta < 0 ? -AC_UNIT : AC_UNIT;
  1377         long c;
  1378         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
  1379                                                 ((c + d) & AC_MASK) |
  1380                                                 (c & ~AC_MASK)));
  1381     }
  1382 
  1383     /**
  1384      * Returns the approximate (non-atomic) number of idle threads per
  1385      * active thread.
  1386      */
  1387     final int idlePerActive() {
  1388         // Approximate at powers of two for small values, saturate past 4
  1389         int p = parallelism;
  1390         int a = p + (int)(ctl >> AC_SHIFT);
  1391         return (a > (p >>>= 1) ? 0 :
  1392                 a > (p >>>= 1) ? 1 :
  1393                 a > (p >>>= 1) ? 2 :
  1394                 a > (p >>>= 1) ? 4 :
  1395                 8);
  1396     }
  1397 
  1398     // Exported methods
  1399 
  1400     // Constructors
  1401 
  1402     /**
  1403      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
  1404      * java.lang.Runtime#availableProcessors}, using the {@linkplain
  1405      * #defaultForkJoinWorkerThreadFactory default thread factory},
  1406      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  1407      *
  1408      * @throws SecurityException if a security manager exists and
  1409      *         the caller is not permitted to modify threads
  1410      *         because it does not hold {@link
  1411      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1412      */
  1413     public ForkJoinPool() {
  1414         this(Runtime.getRuntime().availableProcessors(),
  1415              defaultForkJoinWorkerThreadFactory, null, false);
  1416     }
  1417 
  1418     /**
  1419      * Creates a {@code ForkJoinPool} with the indicated parallelism
  1420      * level, the {@linkplain
  1421      * #defaultForkJoinWorkerThreadFactory default thread factory},
  1422      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  1423      *
  1424      * @param parallelism the parallelism level
  1425      * @throws IllegalArgumentException if parallelism less than or
  1426      *         equal to zero, or greater than implementation limit
  1427      * @throws SecurityException if a security manager exists and
  1428      *         the caller is not permitted to modify threads
  1429      *         because it does not hold {@link
  1430      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1431      */
  1432     public ForkJoinPool(int parallelism) {
  1433         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
  1434     }
  1435 
  1436     /**
  1437      * Creates a {@code ForkJoinPool} with the given parameters.
  1438      *
  1439      * @param parallelism the parallelism level. For default value,
  1440      * use {@link java.lang.Runtime#availableProcessors}.
  1441      * @param factory the factory for creating new threads. For default value,
  1442      * use {@link #defaultForkJoinWorkerThreadFactory}.
  1443      * @param handler the handler for internal worker threads that
  1444      * terminate due to unrecoverable errors encountered while executing
  1445      * tasks. For default value, use {@code null}.
  1446      * @param asyncMode if true,
  1447      * establishes local first-in-first-out scheduling mode for forked
  1448      * tasks that are never joined. This mode may be more appropriate
  1449      * than default locally stack-based mode in applications in which
  1450      * worker threads only process event-style asynchronous tasks.
  1451      * For default value, use {@code false}.
  1452      * @throws IllegalArgumentException if parallelism less than or
  1453      *         equal to zero, or greater than implementation limit
  1454      * @throws NullPointerException if the factory is null
  1455      * @throws SecurityException if a security manager exists and
  1456      *         the caller is not permitted to modify threads
  1457      *         because it does not hold {@link
  1458      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1459      */
  1460     public ForkJoinPool(int parallelism,
  1461                         ForkJoinWorkerThreadFactory factory,
  1462                         Thread.UncaughtExceptionHandler handler,
  1463                         boolean asyncMode) {
  1464         checkPermission();
  1465         if (factory == null)
  1466             throw new NullPointerException();
  1467         if (parallelism <= 0 || parallelism > MAX_ID)
  1468             throw new IllegalArgumentException();
  1469         this.parallelism = parallelism;
  1470         this.factory = factory;
  1471         this.ueh = handler;
  1472         this.locallyFifo = asyncMode;
  1473         long np = (long)(-parallelism); // offset ctl counts
  1474         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  1475         this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
  1476         // initialize workers array with room for 2*parallelism if possible
  1477         int n = parallelism << 1;
  1478         if (n >= MAX_ID)
  1479             n = MAX_ID;
  1480         else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
  1481             n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
  1482         }
  1483         workers = new ForkJoinWorkerThread[n + 1];
  1484         this.submissionLock = new ReentrantLock();
  1485         this.termination = submissionLock.newCondition();
  1486         StringBuilder sb = new StringBuilder("ForkJoinPool-");
  1487         sb.append(poolNumberGenerator.incrementAndGet());
  1488         sb.append("-worker-");
  1489         this.workerNamePrefix = sb.toString();
  1490     }
  1491 
  1492     // Execution methods
  1493 
  1494     /**
  1495      * Performs the given task, returning its result upon completion.
  1496      * If the computation encounters an unchecked Exception or Error,
  1497      * it is rethrown as the outcome of this invocation.  Rethrown
  1498      * exceptions behave in the same way as regular exceptions, but,
  1499      * when possible, contain stack traces (as displayed for example
  1500      * using {@code ex.printStackTrace()}) of both the current thread
  1501      * as well as the thread actually encountering the exception;
  1502      * minimally only the latter.
  1503      *
  1504      * @param task the task
  1505      * @return the task's result
  1506      * @throws NullPointerException if the task is null
  1507      * @throws RejectedExecutionException if the task cannot be
  1508      *         scheduled for execution
  1509      */
  1510     public <T> T invoke(ForkJoinTask<T> task) {
  1511         Thread t = Thread.currentThread();
  1512         if (task == null)
  1513             throw new NullPointerException();
  1514         if (shutdown)
  1515             throw new RejectedExecutionException();
  1516         if ((t instanceof ForkJoinWorkerThread) &&
  1517             ((ForkJoinWorkerThread)t).pool == this)
  1518             return task.invoke();  // bypass submit if in same pool
  1519         else {
  1520             addSubmission(task);
  1521             return task.join();
  1522         }
  1523     }
  1524 
  1525     /**
  1526      * Unless terminating, forks task if within an ongoing FJ
  1527      * computation in the current pool, else submits as external task.
  1528      */
  1529     private <T> void forkOrSubmit(ForkJoinTask<T> task) {
  1530         ForkJoinWorkerThread w;
  1531         Thread t = Thread.currentThread();
  1532         if (shutdown)
  1533             throw new RejectedExecutionException();
  1534         if ((t instanceof ForkJoinWorkerThread) &&
  1535             (w = (ForkJoinWorkerThread)t).pool == this)
  1536             w.pushTask(task);
  1537         else
  1538             addSubmission(task);
  1539     }
  1540 
  1541     /**
  1542      * Arranges for (asynchronous) execution of the given task.
  1543      *
  1544      * @param task the task
  1545      * @throws NullPointerException if the task is null
  1546      * @throws RejectedExecutionException if the task cannot be
  1547      *         scheduled for execution
  1548      */
  1549     public void execute(ForkJoinTask<?> task) {
  1550         if (task == null)
  1551             throw new NullPointerException();
  1552         forkOrSubmit(task);
  1553     }
  1554 
  1555     // AbstractExecutorService methods
  1556 
  1557     /**
  1558      * @throws NullPointerException if the task is null
  1559      * @throws RejectedExecutionException if the task cannot be
  1560      *         scheduled for execution
  1561      */
  1562     public void execute(Runnable task) {
  1563         if (task == null)
  1564             throw new NullPointerException();
  1565         ForkJoinTask<?> job;
  1566         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
  1567             job = (ForkJoinTask<?>) task;
  1568         else
  1569             job = ForkJoinTask.adapt(task, null);
  1570         forkOrSubmit(job);
  1571     }
  1572 
  1573     /**
  1574      * Submits a ForkJoinTask for execution.
  1575      *
  1576      * @param task the task to submit
  1577      * @return the task
  1578      * @throws NullPointerException if the task is null
  1579      * @throws RejectedExecutionException if the task cannot be
  1580      *         scheduled for execution
  1581      */
  1582     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  1583         if (task == null)
  1584             throw new NullPointerException();
  1585         forkOrSubmit(task);
  1586         return task;
  1587     }
  1588 
  1589     /**
  1590      * @throws NullPointerException if the task is null
  1591      * @throws RejectedExecutionException if the task cannot be
  1592      *         scheduled for execution
  1593      */
  1594     public <T> ForkJoinTask<T> submit(Callable<T> task) {
  1595         if (task == null)
  1596             throw new NullPointerException();
  1597         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
  1598         forkOrSubmit(job);
  1599         return job;
  1600     }
  1601 
  1602     /**
  1603      * @throws NullPointerException if the task is null
  1604      * @throws RejectedExecutionException if the task cannot be
  1605      *         scheduled for execution
  1606      */
  1607     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
  1608         if (task == null)
  1609             throw new NullPointerException();
  1610         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
  1611         forkOrSubmit(job);
  1612         return job;
  1613     }
  1614 
  1615     /**
  1616      * @throws NullPointerException if the task is null
  1617      * @throws RejectedExecutionException if the task cannot be
  1618      *         scheduled for execution
  1619      */
  1620     public ForkJoinTask<?> submit(Runnable task) {
  1621         if (task == null)
  1622             throw new NullPointerException();
  1623         ForkJoinTask<?> job;
  1624         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
  1625             job = (ForkJoinTask<?>) task;
  1626         else
  1627             job = ForkJoinTask.adapt(task, null);
  1628         forkOrSubmit(job);
  1629         return job;
  1630     }
  1631 
  1632     /**
  1633      * @throws NullPointerException       {@inheritDoc}
  1634      * @throws RejectedExecutionException {@inheritDoc}
  1635      */
  1636     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
  1637         ArrayList<ForkJoinTask<T>> forkJoinTasks =
  1638             new ArrayList<ForkJoinTask<T>>(tasks.size());
  1639         for (Callable<T> task : tasks)
  1640             forkJoinTasks.add(ForkJoinTask.adapt(task));
  1641         invoke(new InvokeAll<T>(forkJoinTasks));
  1642 
  1643         @SuppressWarnings({"unchecked", "rawtypes"})
  1644             List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
  1645         return futures;
  1646     }
  1647 
  1648     static final class InvokeAll<T> extends RecursiveAction {
  1649         final ArrayList<ForkJoinTask<T>> tasks;
  1650         InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
  1651         public void compute() {
  1652             try { invokeAll(tasks); }
  1653             catch (Exception ignore) {}
  1654         }
  1655         private static final long serialVersionUID = -7914297376763021607L;
  1656     }
  1657 
  1658     /**
  1659      * Returns the factory used for constructing new workers.
  1660      *
  1661      * @return the factory used for constructing new workers
  1662      */
  1663     public ForkJoinWorkerThreadFactory getFactory() {
  1664         return factory;
  1665     }
  1666 
  1667     /**
  1668      * Returns the handler for internal worker threads that terminate
  1669      * due to unrecoverable errors encountered while executing tasks.
  1670      *
  1671      * @return the handler, or {@code null} if none
  1672      */
  1673     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
  1674         return ueh;
  1675     }
  1676 
  1677     /**
  1678      * Returns the targeted parallelism level of this pool.
  1679      *
  1680      * @return the targeted parallelism level of this pool
  1681      */
  1682     public int getParallelism() {
  1683         return parallelism;
  1684     }
  1685 
  1686     /**
  1687      * Returns the number of worker threads that have started but not
  1688      * yet terminated.  The result returned by this method may differ
  1689      * from {@link #getParallelism} when threads are created to
  1690      * maintain parallelism when others are cooperatively blocked.
  1691      *
  1692      * @return the number of worker threads
  1693      */
  1694     public int getPoolSize() {
  1695         return parallelism + (short)(ctl >>> TC_SHIFT);
  1696     }
  1697 
  1698     /**
  1699      * Returns {@code true} if this pool uses local first-in-first-out
  1700      * scheduling mode for forked tasks that are never joined.
  1701      *
  1702      * @return {@code true} if this pool uses async mode
  1703      */
  1704     public boolean getAsyncMode() {
  1705         return locallyFifo;
  1706     }
  1707 
  1708     /**
  1709      * Returns an estimate of the number of worker threads that are
  1710      * not blocked waiting to join tasks or for other managed
  1711      * synchronization. This method may overestimate the
  1712      * number of running threads.
  1713      *
  1714      * @return the number of worker threads
  1715      */
  1716     public int getRunningThreadCount() {
  1717         int r = parallelism + (int)(ctl >> AC_SHIFT);
  1718         return (r <= 0) ? 0 : r; // suppress momentarily negative values
  1719     }
  1720 
  1721     /**
  1722      * Returns an estimate of the number of threads that are currently
  1723      * stealing or executing tasks. This method may overestimate the
  1724      * number of active threads.
  1725      *
  1726      * @return the number of active threads
  1727      */
  1728     public int getActiveThreadCount() {
  1729         int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
  1730         return (r <= 0) ? 0 : r; // suppress momentarily negative values
  1731     }
  1732 
  1733     /**
  1734      * Returns {@code true} if all worker threads are currently idle.
  1735      * An idle worker is one that cannot obtain a task to execute
  1736      * because none are available to steal from other threads, and
  1737      * there are no pending submissions to the pool. This method is
  1738      * conservative; it might not return {@code true} immediately upon
  1739      * idleness of all threads, but will eventually become true if
  1740      * threads remain inactive.
  1741      *
  1742      * @return {@code true} if all threads are currently idle
  1743      */
  1744     public boolean isQuiescent() {
  1745         return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
  1746     }
  1747 
  1748     /**
  1749      * Returns an estimate of the total number of tasks stolen from
  1750      * one thread's work queue by another. The reported value
  1751      * underestimates the actual total number of steals when the pool
  1752      * is not quiescent. This value may be useful for monitoring and
  1753      * tuning fork/join programs: in general, steal counts should be
  1754      * high enough to keep threads busy, but low enough to avoid
  1755      * overhead and contention across threads.
  1756      *
  1757      * @return the number of steals
  1758      */
  1759     public long getStealCount() {
  1760         return stealCount;
  1761     }
  1762 
  1763     /**
  1764      * Returns an estimate of the total number of tasks currently held
  1765      * in queues by worker threads (but not including tasks submitted
  1766      * to the pool that have not begun executing). This value is only
  1767      * an approximation, obtained by iterating across all threads in
  1768      * the pool. This method may be useful for tuning task
  1769      * granularities.
  1770      *
  1771      * @return the number of queued tasks
  1772      */
  1773     public long getQueuedTaskCount() {
  1774         long count = 0;
  1775         ForkJoinWorkerThread[] ws;
  1776         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
  1777             (ws = workers) != null) {
  1778             for (ForkJoinWorkerThread w : ws)
  1779                 if (w != null)
  1780                     count -= w.queueBase - w.queueTop; // must read base first
  1781         }
  1782         return count;
  1783     }
  1784 
  1785     /**
  1786      * Returns an estimate of the number of tasks submitted to this
  1787      * pool that have not yet begun executing.  This method may take
  1788      * time proportional to the number of submissions.
  1789      *
  1790      * @return the number of queued submissions
  1791      */
  1792     public int getQueuedSubmissionCount() {
  1793         return -queueBase + queueTop;
  1794     }
  1795 
  1796     /**
  1797      * Returns {@code true} if there are any tasks submitted to this
  1798      * pool that have not yet begun executing.
  1799      *
  1800      * @return {@code true} if there are any queued submissions
  1801      */
  1802     public boolean hasQueuedSubmissions() {
  1803         return queueBase != queueTop;
  1804     }
  1805 
  1806     /**
  1807      * Removes and returns the next unexecuted submission if one is
  1808      * available.  This method may be useful in extensions to this
  1809      * class that re-assign work in systems with multiple pools.
  1810      *
  1811      * @return the next submission, or {@code null} if none
  1812      */
  1813     protected ForkJoinTask<?> pollSubmission() {
  1814         ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
  1815         while ((b = queueBase) != queueTop &&
  1816                (q = submissionQueue) != null &&
  1817                (i = (q.length - 1) & b) >= 0) {
  1818             long u = (i << ASHIFT) + ABASE;
  1819             if ((t = q[i]) != null &&
  1820                 queueBase == b &&
  1821                 UNSAFE.compareAndSwapObject(q, u, t, null)) {
  1822                 queueBase = b + 1;
  1823                 return t;
  1824             }
  1825         }
  1826         return null;
  1827     }
  1828 
  1829     /**
  1830      * Removes all available unexecuted submitted and forked tasks
  1831      * from scheduling queues and adds them to the given collection,
  1832      * without altering their execution status. These may include
  1833      * artificially generated or wrapped tasks. This method is
  1834      * designed to be invoked only when the pool is known to be
  1835      * quiescent. Invocations at other times may not remove all
  1836      * tasks. A failure encountered while attempting to add elements
  1837      * to collection {@code c} may result in elements being in
  1838      * neither, either or both collections when the associated
  1839      * exception is thrown.  The behavior of this operation is
  1840      * undefined if the specified collection is modified while the
  1841      * operation is in progress.
  1842      *
  1843      * @param c the collection to transfer elements into
  1844      * @return the number of elements transferred
  1845      */
  1846     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
  1847         int count = 0;
  1848         while (queueBase != queueTop) {
  1849             ForkJoinTask<?> t = pollSubmission();
  1850             if (t != null) {
  1851                 c.add(t);
  1852                 ++count;
  1853             }
  1854         }
  1855         ForkJoinWorkerThread[] ws;
  1856         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
  1857             (ws = workers) != null) {
  1858             for (ForkJoinWorkerThread w : ws)
  1859                 if (w != null)
  1860                     count += w.drainTasksTo(c);
  1861         }
  1862         return count;
  1863     }
  1864 
  1865     /**
  1866      * Returns a string identifying this pool, as well as its state,
  1867      * including indications of run state, parallelism level, and
  1868      * worker and task counts.
  1869      *
  1870      * @return a string identifying this pool, as well as its state
  1871      */
  1872     public String toString() {
  1873         long st = getStealCount();
  1874         long qt = getQueuedTaskCount();
  1875         long qs = getQueuedSubmissionCount();
  1876         int pc = parallelism;
  1877         long c = ctl;
  1878         int tc = pc + (short)(c >>> TC_SHIFT);
  1879         int rc = pc + (int)(c >> AC_SHIFT);
  1880         if (rc < 0) // ignore transient negative
  1881             rc = 0;
  1882         int ac = rc + blockedCount;
  1883         String level;
  1884         if ((c & STOP_BIT) != 0)
  1885             level = (tc == 0) ? "Terminated" : "Terminating";
  1886         else
  1887             level = shutdown ? "Shutting down" : "Running";
  1888         return super.toString() +
  1889             "[" + level +
  1890             ", parallelism = " + pc +
  1891             ", size = " + tc +
  1892             ", active = " + ac +
  1893             ", running = " + rc +
  1894             ", steals = " + st +
  1895             ", tasks = " + qt +
  1896             ", submissions = " + qs +
  1897             "]";
  1898     }
  1899 
  1900     /**
  1901      * Initiates an orderly shutdown in which previously submitted
  1902      * tasks are executed, but no new tasks will be accepted.
  1903      * Invocation has no additional effect if already shut down.
  1904      * Tasks that are in the process of being submitted concurrently
  1905      * during the course of this method may or may not be rejected.
  1906      *
  1907      * @throws SecurityException if a security manager exists and
  1908      *         the caller is not permitted to modify threads
  1909      *         because it does not hold {@link
  1910      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1911      */
  1912     public void shutdown() {
  1913         checkPermission();
  1914         shutdown = true;
  1915         tryTerminate(false);
  1916     }
  1917 
  1918     /**
  1919      * Attempts to cancel and/or stop all tasks, and reject all
  1920      * subsequently submitted tasks.  Tasks that are in the process of
  1921      * being submitted or executed concurrently during the course of
  1922      * this method may or may not be rejected. This method cancels
  1923      * both existing and unexecuted tasks, in order to permit
  1924      * termination in the presence of task dependencies. So the method
  1925      * always returns an empty list (unlike the case for some other
  1926      * Executors).
  1927      *
  1928      * @return an empty list
  1929      * @throws SecurityException if a security manager exists and
  1930      *         the caller is not permitted to modify threads
  1931      *         because it does not hold {@link
  1932      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1933      */
  1934     public List<Runnable> shutdownNow() {
  1935         checkPermission();
  1936         shutdown = true;
  1937         tryTerminate(true);
  1938         return Collections.emptyList();
  1939     }
  1940 
  1941     /**
  1942      * Returns {@code true} if all tasks have completed following shut down.
  1943      *
  1944      * @return {@code true} if all tasks have completed following shut down
  1945      */
  1946     public boolean isTerminated() {
  1947         long c = ctl;
  1948         return ((c & STOP_BIT) != 0L &&
  1949                 (short)(c >>> TC_SHIFT) == -parallelism);
  1950     }
  1951 
  1952     /**
  1953      * Returns {@code true} if the process of termination has
  1954      * commenced but not yet completed.  This method may be useful for
  1955      * debugging. A return of {@code true} reported a sufficient
  1956      * period after shutdown may indicate that submitted tasks have
  1957      * ignored or suppressed interruption, or are waiting for IO,
  1958      * causing this executor not to properly terminate. (See the
  1959      * advisory notes for class {@link ForkJoinTask} stating that
  1960      * tasks should not normally entail blocking operations.  But if
  1961      * they do, they must abort them on interrupt.)
  1962      *
  1963      * @return {@code true} if terminating but not yet terminated
  1964      */
  1965     public boolean isTerminating() {
  1966         long c = ctl;
  1967         return ((c & STOP_BIT) != 0L &&
  1968                 (short)(c >>> TC_SHIFT) != -parallelism);
  1969     }
  1970 
  1971     /**
  1972      * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
  1973      */
  1974     final boolean isAtLeastTerminating() {
  1975         return (ctl & STOP_BIT) != 0L;
  1976     }
  1977 
  1978     /**
  1979      * Returns {@code true} if this pool has been shut down.
  1980      *
  1981      * @return {@code true} if this pool has been shut down
  1982      */
  1983     public boolean isShutdown() {
  1984         return shutdown;
  1985     }
  1986 
  1987     /**
  1988      * Blocks until all tasks have completed execution after a shutdown
  1989      * request, or the timeout occurs, or the current thread is
  1990      * interrupted, whichever happens first.
  1991      *
  1992      * @param timeout the maximum time to wait
  1993      * @param unit the time unit of the timeout argument
  1994      * @return {@code true} if this executor terminated and
  1995      *         {@code false} if the timeout elapsed before termination
  1996      * @throws InterruptedException if interrupted while waiting
  1997      */
  1998     public boolean awaitTermination(long timeout, TimeUnit unit)
  1999         throws InterruptedException {
  2000         long nanos = unit.toNanos(timeout);
  2001         final ReentrantLock lock = this.submissionLock;
  2002         lock.lock();
  2003         try {
  2004             for (;;) {
  2005                 if (isTerminated())
  2006                     return true;
  2007                 if (nanos <= 0)
  2008                     return false;
  2009                 nanos = termination.awaitNanos(nanos);
  2010             }
  2011         } finally {
  2012             lock.unlock();
  2013         }
  2014     }
  2015 
  2016     /**
  2017      * Interface for extending managed parallelism for tasks running
  2018      * in {@link ForkJoinPool}s.
  2019      *
  2020      * <p>A {@code ManagedBlocker} provides two methods.  Method
  2021      * {@code isReleasable} must return {@code true} if blocking is
  2022      * not necessary. Method {@code block} blocks the current thread
  2023      * if necessary (perhaps internally invoking {@code isReleasable}
  2024      * before actually blocking). These actions are performed by any
  2025      * thread invoking {@link ForkJoinPool#managedBlock}.  The
  2026      * unusual methods in this API accommodate synchronizers that may,
  2027      * but don't usually, block for long periods. Similarly, they
  2028      * allow more efficient internal handling of cases in which
  2029      * additional workers may be, but usually are not, needed to
  2030      * ensure sufficient parallelism.  Toward this end,
  2031      * implementations of method {@code isReleasable} must be amenable
  2032      * to repeated invocation.
  2033      *
  2034      * <p>For example, here is a ManagedBlocker based on a
  2035      * ReentrantLock:
  2036      *  <pre> {@code
  2037      * class ManagedLocker implements ManagedBlocker {
  2038      *   final ReentrantLock lock;
  2039      *   boolean hasLock = false;
  2040      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
  2041      *   public boolean block() {
  2042      *     if (!hasLock)
  2043      *       lock.lock();
  2044      *     return true;
  2045      *   }
  2046      *   public boolean isReleasable() {
  2047      *     return hasLock || (hasLock = lock.tryLock());
  2048      *   }
  2049      * }}</pre>
  2050      *
  2051      * <p>Here is a class that possibly blocks waiting for an
  2052      * item on a given queue:
  2053      *  <pre> {@code
  2054      * class QueueTaker<E> implements ManagedBlocker {
  2055      *   final BlockingQueue<E> queue;
  2056      *   volatile E item = null;
  2057      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
  2058      *   public boolean block() throws InterruptedException {
  2059      *     if (item == null)
  2060      *       item = queue.take();
  2061      *     return true;
  2062      *   }
  2063      *   public boolean isReleasable() {
  2064      *     return item != null || (item = queue.poll()) != null;
  2065      *   }
  2066      *   public E getItem() { // call after pool.managedBlock completes
  2067      *     return item;
  2068      *   }
  2069      * }}</pre>
  2070      */
  2071     public static interface ManagedBlocker {
  2072         /**
  2073          * Possibly blocks the current thread, for example waiting for
  2074          * a lock or condition.
  2075          *
  2076          * @return {@code true} if no additional blocking is necessary
  2077          * (i.e., if isReleasable would return true)
  2078          * @throws InterruptedException if interrupted while waiting
  2079          * (the method is not required to do so, but is allowed to)
  2080          */
  2081         boolean block() throws InterruptedException;
  2082 
  2083         /**
  2084          * Returns {@code true} if blocking is unnecessary.
  2085          */
  2086         boolean isReleasable();
  2087     }
  2088 
  2089     /**
  2090      * Blocks in accord with the given blocker.  If the current thread
  2091      * is a {@link ForkJoinWorkerThread}, this method possibly
  2092      * arranges for a spare thread to be activated if necessary to
  2093      * ensure sufficient parallelism while the current thread is blocked.
  2094      *
  2095      * <p>If the caller is not a {@link ForkJoinTask}, this method is
  2096      * behaviorally equivalent to
  2097      *  <pre> {@code
  2098      * while (!blocker.isReleasable())
  2099      *   if (blocker.block())
  2100      *     return;
  2101      * }</pre>
  2102      *
  2103      * If the caller is a {@code ForkJoinTask}, then the pool may
  2104      * first be expanded to ensure parallelism, and later adjusted.
  2105      *
  2106      * @param blocker the blocker
  2107      * @throws InterruptedException if blocker.block did so
  2108      */
  2109     public static void managedBlock(ManagedBlocker blocker)
  2110         throws InterruptedException {
  2111         Thread t = Thread.currentThread();
  2112         if (t instanceof ForkJoinWorkerThread) {
  2113             ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
  2114             w.pool.awaitBlocker(blocker);
  2115         }
  2116         else {
  2117             do {} while (!blocker.isReleasable() && !blocker.block());
  2118         }
  2119     }
  2120 
  2121     // AbstractExecutorService overrides.  These rely on undocumented
  2122     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
  2123     // implement RunnableFuture.
  2124 
  2125     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  2126         return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
  2127     }
  2128 
  2129     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  2130         return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
  2131     }
  2132 
  2133     // Unsafe mechanics
  2134     private static final sun.misc.Unsafe UNSAFE;
  2135     private static final long ctlOffset;
  2136     private static final long stealCountOffset;
  2137     private static final long blockedCountOffset;
  2138     private static final long quiescerCountOffset;
  2139     private static final long scanGuardOffset;
  2140     private static final long nextWorkerNumberOffset;
  2141     private static final long ABASE;
  2142     private static final int ASHIFT;
  2143 
  2144     static {
  2145         poolNumberGenerator = new AtomicInteger();
  2146         workerSeedGenerator = new Random();
  2147         modifyThreadPermission = new RuntimePermission("modifyThread");
  2148         defaultForkJoinWorkerThreadFactory =
  2149             new DefaultForkJoinWorkerThreadFactory();
  2150         int s;
  2151         try {
  2152             UNSAFE = sun.misc.Unsafe.getUnsafe();
  2153             Class k = ForkJoinPool.class;
  2154             ctlOffset = UNSAFE.objectFieldOffset
  2155                 (k.getDeclaredField("ctl"));
  2156             stealCountOffset = UNSAFE.objectFieldOffset
  2157                 (k.getDeclaredField("stealCount"));
  2158             blockedCountOffset = UNSAFE.objectFieldOffset
  2159                 (k.getDeclaredField("blockedCount"));
  2160             quiescerCountOffset = UNSAFE.objectFieldOffset
  2161                 (k.getDeclaredField("quiescerCount"));
  2162             scanGuardOffset = UNSAFE.objectFieldOffset
  2163                 (k.getDeclaredField("scanGuard"));
  2164             nextWorkerNumberOffset = UNSAFE.objectFieldOffset
  2165                 (k.getDeclaredField("nextWorkerNumber"));
  2166             Class a = ForkJoinTask[].class;
  2167             ABASE = UNSAFE.arrayBaseOffset(a);
  2168             s = UNSAFE.arrayIndexScale(a);
  2169         } catch (Exception e) {
  2170             throw new Error(e);
  2171         }
  2172         if ((s & (s-1)) != 0)
  2173             throw new Error("data type scale not a power of two");
  2174         ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
  2175     }
  2176 
  2177 }