rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinPool.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 12:51:03 +0100
changeset 1895 bfaf3300b7ba
parent 1890 212417b74b72
child 1896 9984d9a62bc0
permissions -rw-r--r--
Making java.util.concurrent package compilable except ForkJoinPool
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.ArrayList;
    39 import java.util.Arrays;
    40 import java.util.Collection;
    41 import java.util.Collections;
    42 import java.util.List;
    43 import java.util.Random;
    44 import java.util.concurrent.AbstractExecutorService;
    45 import java.util.concurrent.Callable;
    46 import java.util.concurrent.ExecutorService;
    47 import java.util.concurrent.Future;
    48 import java.util.concurrent.RejectedExecutionException;
    49 import java.util.concurrent.RunnableFuture;
    50 import java.util.concurrent.TimeUnit;
    51 import java.util.concurrent.atomic.AtomicInteger;
    52 import java.util.concurrent.locks.LockSupport;
    53 import java.util.concurrent.locks.ReentrantLock;
    54 import java.util.concurrent.locks.Condition;
    55 
    56 /**
    57  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
    58  * A {@code ForkJoinPool} provides the entry point for submissions
    59  * from non-{@code ForkJoinTask} clients, as well as management and
    60  * monitoring operations.
    61  *
    62  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
    63  * ExecutorService} mainly by virtue of employing
    64  * <em>work-stealing</em>: all threads in the pool attempt to find and
    65  * execute subtasks created by other active tasks (eventually blocking
    66  * waiting for work if none exist). This enables efficient processing
    67  * when most tasks spawn other subtasks (as do most {@code
    68  * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
    69  * constructors, {@code ForkJoinPool}s may also be appropriate for use
    70  * with event-style tasks that are never joined.
    71  *
    72  * <p>A {@code ForkJoinPool} is constructed with a given target
    73  * parallelism level; by default, equal to the number of available
    74  * processors. The pool attempts to maintain enough active (or
    75  * available) threads by dynamically adding, suspending, or resuming
    76  * internal worker threads, even if some tasks are stalled waiting to
    77  * join others. However, no such adjustments are guaranteed in the
    78  * face of blocked IO or other unmanaged synchronization. The nested
    79  * {@link ManagedBlocker} interface enables extension of the kinds of
    80  * synchronization accommodated.
    81  *
    82  * <p>In addition to execution and lifecycle control methods, this
    83  * class provides status check methods (for example
    84  * {@link #getStealCount}) that are intended to aid in developing,
    85  * tuning, and monitoring fork/join applications. Also, method
    86  * {@link #toString} returns indications of pool state in a
    87  * convenient form for informal monitoring.
    88  *
    89  * <p> As is the case with other ExecutorServices, there are three
    90  * main task execution methods summarized in the following
    91  * table. These are designed to be used by clients not already engaged
    92  * in fork/join computations in the current pool.  The main forms of
    93  * these methods accept instances of {@code ForkJoinTask}, but
    94  * overloaded forms also allow mixed execution of plain {@code
    95  * Runnable}- or {@code Callable}- based activities as well.  However,
    96  * tasks that are already executing in a pool should normally
    97  * <em>NOT</em> use these pool execution methods, but instead use the
    98  * within-computation forms listed in the table.
    99  *
   100  * <table BORDER CELLPADDING=3 CELLSPACING=1>
   101  *  <tr>
   102  *    <td></td>
   103  *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
   104  *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
   105  *  </tr>
   106  *  <tr>
   107  *    <td> <b>Arrange async execution</td>
   108  *    <td> {@link #execute(ForkJoinTask)}</td>
   109  *    <td> {@link ForkJoinTask#fork}</td>
   110  *  </tr>
   111  *  <tr>
   112  *    <td> <b>Await and obtain result</td>
   113  *    <td> {@link #invoke(ForkJoinTask)}</td>
   114  *    <td> {@link ForkJoinTask#invoke}</td>
   115  *  </tr>
   116  *  <tr>
   117  *    <td> <b>Arrange exec and obtain Future</td>
   118  *    <td> {@link #submit(ForkJoinTask)}</td>
   119  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
   120  *  </tr>
   121  * </table>
   122  *
   123  * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
   124  * used for all parallel task execution in a program or subsystem.
   125  * Otherwise, use would not usually outweigh the construction and
   126  * bookkeeping overhead of creating a large set of threads. For
   127  * example, a common pool could be used for the {@code SortTasks}
   128  * illustrated in {@link RecursiveAction}. Because {@code
   129  * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
   130  * daemon} mode, there is typically no need to explicitly {@link
   131  * #shutdown} such a pool upon program exit.
   132  *
   133  * <pre>
   134  * static final ForkJoinPool mainPool = new ForkJoinPool();
   135  * ...
   136  * public void sort(long[] array) {
   137  *   mainPool.invoke(new SortTask(array, 0, array.length));
   138  * }
   139  * </pre>
   140  *
   141  * <p><b>Implementation notes</b>: This implementation restricts the
   142  * maximum number of running threads to 32767. Attempts to create
   143  * pools with greater than the maximum number result in
   144  * {@code IllegalArgumentException}.
   145  *
   146  * <p>This implementation rejects submitted tasks (that is, by throwing
   147  * {@link RejectedExecutionException}) only when the pool is shut down
   148  * or internal resources have been exhausted.
   149  *
   150  * @since 1.7
   151  * @author Doug Lea
   152  */
   153 public class ForkJoinPool extends AbstractExecutorService {
   154 
   155     /*
   156      * Implementation Overview
   157      *
   158      * This class provides the central bookkeeping and control for a
   159      * set of worker threads: Submissions from non-FJ threads enter
   160      * into a submission queue. Workers take these tasks and typically
   161      * split them into subtasks that may be stolen by other workers.
   162      * Preference rules give first priority to processing tasks from
   163      * their own queues (LIFO or FIFO, depending on mode), then to
   164      * randomized FIFO steals of tasks in other worker queues, and
   165      * lastly to new submissions.
   166      *
   167      * The main throughput advantages of work-stealing stem from
   168      * decentralized control -- workers mostly take tasks from
   169      * themselves or each other. We cannot negate this in the
   170      * implementation of other management responsibilities. The main
   171      * tactic for avoiding bottlenecks is packing nearly all
   172      * essentially atomic control state into a single 64bit volatile
   173      * variable ("ctl"). This variable is read on the order of 10-100
   174      * times as often as it is modified (always via CAS). (There is
   175      * some additional control state, for example variable "shutdown"
   176      * for which we can cope with uncoordinated updates.)  This
   177      * streamlines synchronization and control at the expense of messy
   178      * constructions needed to repack status bits upon updates.
   179      * Updates tend not to contend with each other except during
   180      * bursts while submitted tasks begin or end.  In some cases when
   181      * they do contend, threads can instead do something else
   182      * (usually, scan for tasks) until contention subsides.
   183      *
   184      * To enable packing, we restrict maximum parallelism to (1<<15)-1
   185      * (which is far in excess of normal operating range) to allow
   186      * ids, counts, and their negations (used for thresholding) to fit
   187      * into 16bit fields.
   188      *
   189      * Recording Workers.  Workers are recorded in the "workers" array
   190      * that is created upon pool construction and expanded if (rarely)
   191      * necessary.  This is an array as opposed to some other data
   192      * structure to support index-based random steals by workers.
   193      * Updates to the array recording new workers and unrecording
   194      * terminated ones are protected from each other by a seqLock
   195      * (scanGuard) but the array is otherwise concurrently readable,
   196      * and accessed directly by workers. To simplify index-based
   197      * operations, the array size is always a power of two, and all
   198      * readers must tolerate null slots. To avoid flailing during
   199      * start-up, the array is presized to hold twice #parallelism
   200      * workers (which is unlikely to need further resizing during
   201      * execution). But to avoid dealing with so many null slots,
   202      * variable scanGuard includes a mask for the nearest power of two
   203      * that contains all current workers.  All worker thread creation
   204      * is on-demand, triggered by task submissions, replacement of
   205      * terminated workers, and/or compensation for blocked
   206      * workers. However, all other support code is set up to work with
   207      * other policies.  To ensure that we do not hold on to worker
   208      * references that would prevent GC, ALL accesses to workers are
   209      * via indices into the workers array (which is one source of some
   210      * of the messy code constructions here). In essence, the workers
   211      * array serves as a weak reference mechanism. Thus for example
   212      * the wait queue field of ctl stores worker indices, not worker
   213      * references.  Access to the workers in associated methods (for
   214      * example signalWork) must both index-check and null-check the
   215      * IDs. All such accesses ignore bad IDs by returning out early
   216      * from what they are doing, since this can only be associated
   217      * with termination, in which case it is OK to give up.
   218      *
   219      * All uses of the workers array, as well as queue arrays, check
   220      * that the array is non-null (even if previously non-null). This
   221      * allows nulling during termination, which is currently not
   222      * necessary, but remains an option for resource-revocation-based
   223      * shutdown schemes.
   224      *
   225      * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
   226      * let workers spin indefinitely scanning for tasks when none can
   227      * be found immediately, and we cannot start/resume workers unless
   228      * there appear to be tasks available.  On the other hand, we must
   229      * quickly prod them into action when new tasks are submitted or
   230      * generated.  We park/unpark workers after placing in an event
   231      * wait queue when they cannot find work. This "queue" is actually
   232      * a simple Treiber stack, headed by the "id" field of ctl, plus a
   233      * 15bit counter value to both wake up waiters (by advancing their
   234      * count) and avoid ABA effects. Successors are held in worker
   235      * field "nextWait".  Queuing deals with several intrinsic races,
   236      * mainly that a task-producing thread can miss seeing (and
   237      * signalling) another thread that gave up looking for work but
   238      * has not yet entered the wait queue. We solve this by requiring
   239      * a full sweep of all workers both before (in scan()) and after
   240      * (in tryAwaitWork()) a newly waiting worker is added to the wait
   241      * queue. During a rescan, the worker might release some other
   242      * queued worker rather than itself, which has the same net
   243      * effect. Because enqueued workers may actually be rescanning
   244      * rather than waiting, we set and clear the "parked" field of
   245      * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
   246      * (Use of the parked field requires a secondary recheck to avoid
   247      * missed signals.)
   248      *
   249      * Signalling.  We create or wake up workers only when there
   250      * appears to be at least one task they might be able to find and
   251      * execute.  When a submission is added or another worker adds a
   252      * task to a queue that previously had two or fewer tasks, they
   253      * signal waiting workers (or trigger creation of new ones if
   254      * fewer than the given parallelism level -- see signalWork).
   255      * These primary signals are buttressed by signals during rescans
   256      * as well as those performed when a worker steals a task and
   257      * notices that there are more tasks too; together these cover the
   258      * signals needed in cases when more than two tasks are pushed
   259      * but untaken.
   260      *
   261      * Trimming workers. To release resources after periods of lack of
   262      * use, a worker starting to wait when the pool is quiescent will
   263      * time out and terminate if the pool has remained quiescent for
   264      * SHRINK_RATE nanosecs. This will slowly propagate, eventually
   265      * terminating all workers after long periods of non-use.
   266      *
   267      * Submissions. External submissions are maintained in an
   268      * array-based queue that is structured identically to
   269      * ForkJoinWorkerThread queues except for the use of
   270      * submissionLock in method addSubmission. Unlike the case for
   271      * worker queues, multiple external threads can add new
   272      * submissions, so adding requires a lock.
   273      *
   274      * Compensation. Beyond work-stealing support and lifecycle
   275      * control, the main responsibility of this framework is to take
   276      * actions when one worker is waiting to join a task stolen (or
   277      * always held by) another.  Because we are multiplexing many
   278      * tasks on to a pool of workers, we can't just let them block (as
   279      * in Thread.join).  We also cannot just reassign the joiner's
   280      * run-time stack with another and replace it later, which would
   281      * be a form of "continuation", that even if possible is not
   282      * necessarily a good idea since we sometimes need both an
   283      * unblocked task and its continuation to progress. Instead we
   284      * combine two tactics:
   285      *
   286      *   Helping: Arranging for the joiner to execute some task that it
   287      *      would be running if the steal had not occurred.  Method
   288      *      ForkJoinWorkerThread.joinTask tracks joining->stealing
   289      *      links to try to find such a task.
   290      *
   291      *   Compensating: Unless there are already enough live threads,
   292      *      method tryPreBlock() may create or re-activate a spare
   293      *      thread to compensate for blocked joiners until they
   294      *      unblock.
   295      *
   296      * The ManagedBlocker extension API can't use helping so relies
   297      * only on compensation in method awaitBlocker.
   298      *
   299      * It is impossible to keep exactly the target parallelism number
   300      * of threads running at any given time.  Determining the
   301      * existence of conservatively safe helping targets, the
   302      * availability of already-created spares, and the apparent need
   303      * to create new spares are all racy and require heuristic
   304      * guidance, so we rely on multiple retries of each.  Currently,
   305      * in keeping with on-demand signalling policy, we compensate only
   306      * if blocking would leave less than one active (non-waiting,
   307      * non-blocked) worker. Additionally, to avoid some false alarms
   308      * due to GC, lagging counters, system activity, etc, compensated
   309      * blocking for joins is only attempted after rechecks stabilize
   310      * (retries are interspersed with Thread.yield, for good
   311      * citizenship).  The variable blockedCount, incremented before
   312      * blocking and decremented after, is sometimes needed to
   313      * distinguish cases of waiting for work vs blocking on joins or
   314      * other managed sync. Both cases are equivalent for most pool
   315      * control, so we can update non-atomically. (Additionally,
   316      * contention on blockedCount alleviates some contention on ctl).
   317      *
   318      * Shutdown and Termination. A call to shutdownNow atomically sets
   319      * the ctl stop bit and then (non-atomically) sets each workers
   320      * "terminate" status, cancels all unprocessed tasks, and wakes up
   321      * all waiting workers.  Detecting whether termination should
   322      * commence after a non-abrupt shutdown() call requires more work
   323      * and bookkeeping. We need consensus about quiesence (i.e., that
   324      * there is no more work) which is reflected in active counts so
   325      * long as there are no current blockers, as well as possible
   326      * re-evaluations during independent changes in blocking or
   327      * quiescing workers.
   328      *
   329      * Style notes: There is a lot of representation-level coupling
   330      * among classes ForkJoinPool, ForkJoinWorkerThread, and
   331      * ForkJoinTask.  Most fields of ForkJoinWorkerThread maintain
   332      * data structures managed by ForkJoinPool, so are directly
   333      * accessed.  Conversely we allow access to "workers" array by
   334      * workers, and direct access to ForkJoinTask.status by both
   335      * ForkJoinPool and ForkJoinWorkerThread.  There is little point
   336      * trying to reduce this, since any associated future changes in
   337      * representations will need to be accompanied by algorithmic
   338      * changes anyway. All together, these low-level implementation
   339      * choices produce as much as a factor of 4 performance
   340      * improvement compared to naive implementations, and enable the
   341      * processing of billions of tasks per second, at the expense of
   342      * some ugliness.
   343      *
   344      * Methods signalWork() and scan() are the main bottlenecks so are
   345      * especially heavily micro-optimized/mangled.  There are lots of
   346      * inline assignments (of form "while ((local = field) != 0)")
   347      * which are usually the simplest way to ensure the required read
   348      * orderings (which are sometimes critical). This leads to a
   349      * "C"-like style of listing declarations of these locals at the
   350      * heads of methods or blocks.  There are several occurrences of
   351      * the unusual "do {} while (!cas...)"  which is the simplest way
   352      * to force an update of a CAS'ed variable. There are also other
   353      * coding oddities that help some methods perform reasonably even
   354      * when interpreted (not compiled).
   355      *
   356      * The order of declarations in this file is: (1) declarations of
   357      * statics (2) fields (along with constants used when unpacking
   358      * some of them), listed in an order that tends to reduce
   359      * contention among them a bit under most JVMs.  (3) internal
   360      * control methods (4) callbacks and other support for
   361      * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
   362      * methods (plus a few little helpers). (6) static block
   363      * initializing all statics in a minimally dependent order.
   364      */
   365 
   366     /**
   367      * Factory for creating new {@link ForkJoinWorkerThread}s.
   368      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
   369      * for {@code ForkJoinWorkerThread} subclasses that extend base
   370      * functionality or initialize threads with different contexts.
   371      */
   372     public static interface ForkJoinWorkerThreadFactory {
   373         /**
   374          * Returns a new worker thread operating in the given pool.
   375          *
   376          * @param pool the pool this thread works in
   377          * @throws NullPointerException if the pool is null
   378          */
   379         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
   380     }
   381 
   382     /**
   383      * Default ForkJoinWorkerThreadFactory implementation; creates a
   384      * new ForkJoinWorkerThread.
   385      */
   386     static class DefaultForkJoinWorkerThreadFactory
   387         implements ForkJoinWorkerThreadFactory {
   388         public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
   389             return new ForkJoinWorkerThread(pool);
   390         }
   391     }
   392 
   393     /**
   394      * Creates a new ForkJoinWorkerThread. This factory is used unless
   395      * overridden in ForkJoinPool constructors.
   396      */
   397     public static final ForkJoinWorkerThreadFactory
   398         defaultForkJoinWorkerThreadFactory;
   399 
   400     /**
   401      * If there is a security manager, makes sure caller has
   402      * permission to modify threads.
   403      */
   404     private static void checkPermission() {
   405         throw new SecurityException();
   406     }
   407 
   408     /**
   409      * Generator for assigning sequence numbers as pool names.
   410      */
   411     private static final AtomicInteger poolNumberGenerator;
   412 
   413     /**
   414      * Generator for initial random seeds for worker victim
   415      * selection. This is used only to create initial seeds. Random
   416      * steals use a cheaper xorshift generator per steal attempt. We
   417      * don't expect much contention on seedGenerator, so just use a
   418      * plain Random.
   419      */
   420     static final Random workerSeedGenerator;
   421 
   422     /**
   423      * Array holding all worker threads in the pool.  Initialized upon
   424      * construction. Array size must be a power of two.  Updates and
   425      * replacements are protected by scanGuard, but the array is
   426      * always kept in a consistent enough state to be randomly
   427      * accessed without locking by workers performing work-stealing,
   428      * as well as other traversal-based methods in this class, so long
   429      * as reads memory-acquire by first reading ctl. All readers must
   430      * tolerate that some array slots may be null.
   431      */
   432     ForkJoinWorkerThread[] workers;
   433 
   434     /**
   435      * Initial size for submission queue array. Must be a power of
   436      * two.  In many applications, these always stay small so we use a
   437      * small initial cap.
   438      */
   439     private static final int INITIAL_QUEUE_CAPACITY = 8;
   440 
   441     /**
   442      * Maximum size for submission queue array. Must be a power of two
   443      * less than or equal to 1 << (31 - width of array entry) to
   444      * ensure lack of index wraparound, but is capped at a lower
   445      * value to help users trap runaway computations.
   446      */
   447     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
   448 
   449     /**
   450      * Array serving as submission queue. Initialized upon construction.
   451      */
   452     private ForkJoinTask<?>[] submissionQueue;
   453 
   454     /**
   455      * Lock protecting submissions array for addSubmission
   456      */
   457     private final ReentrantLock submissionLock;
   458 
   459     /**
   460      * Condition for awaitTermination, using submissionLock for
   461      * convenience.
   462      */
   463     private final Condition termination;
   464 
   465     /**
   466      * Creation factory for worker threads.
   467      */
   468     private final ForkJoinWorkerThreadFactory factory;
   469 
   470     /**
   471      * The uncaught exception handler used when any worker abruptly
   472      * terminates.
   473      */
   474     final Thread.UncaughtExceptionHandler ueh;
   475 
   476     /**
   477      * Prefix for assigning names to worker threads
   478      */
   479     private final String workerNamePrefix;
   480 
   481     /**
   482      * Sum of per-thread steal counts, updated only when threads are
   483      * idle or terminating.
   484      */
   485     private volatile long stealCount;
   486 
   487     /**
   488      * Main pool control -- a long packed with:
   489      * AC: Number of active running workers minus target parallelism (16 bits)
   490      * TC: Number of total workers minus target parallelism (16bits)
   491      * ST: true if pool is terminating (1 bit)
   492      * EC: the wait count of top waiting thread (15 bits)
   493      * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
   494      *
   495      * When convenient, we can extract the upper 32 bits of counts and
   496      * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
   497      * (int)ctl.  The ec field is never accessed alone, but always
   498      * together with id and st. The offsets of counts by the target
   499      * parallelism and the positionings of fields makes it possible to
   500      * perform the most common checks via sign tests of fields: When
   501      * ac is negative, there are not enough active workers, when tc is
   502      * negative, there are not enough total workers, when id is
   503      * negative, there is at least one waiting worker, and when e is
   504      * negative, the pool is terminating.  To deal with these possibly
   505      * negative fields, we use casts in and out of "short" and/or
   506      * signed shifts to maintain signedness.
   507      */
   508     volatile long ctl;
   509 
   510     // bit positions/shifts for fields
   511     private static final int  AC_SHIFT   = 48;
   512     private static final int  TC_SHIFT   = 32;
   513     private static final int  ST_SHIFT   = 31;
   514     private static final int  EC_SHIFT   = 16;
   515 
   516     // bounds
   517     private static final int  MAX_ID     = 0x7fff;  // max poolIndex
   518     private static final int  SMASK      = 0xffff;  // mask short bits
   519     private static final int  SHORT_SIGN = 1 << 15;
   520     private static final int  INT_SIGN   = 1 << 31;
   521 
   522     // masks
   523     private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
   524     private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
   525     private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
   526 
   527     // units for incrementing and decrementing
   528     private static final long TC_UNIT    = 1L << TC_SHIFT;
   529     private static final long AC_UNIT    = 1L << AC_SHIFT;
   530 
   531     // masks and units for dealing with u = (int)(ctl >>> 32)
   532     private static final int  UAC_SHIFT  = AC_SHIFT - 32;
   533     private static final int  UTC_SHIFT  = TC_SHIFT - 32;
   534     private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
   535     private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
   536     private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
   537     private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
   538 
   539     // masks and units for dealing with e = (int)ctl
   540     private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
   541     private static final int  EC_UNIT    = 1 << EC_SHIFT;
   542 
   543     /**
   544      * The target parallelism level.
   545      */
   546     final int parallelism;
   547 
   548     /**
   549      * Index (mod submission queue length) of next element to take
   550      * from submission queue. Usage is identical to that for
   551      * per-worker queues -- see ForkJoinWorkerThread internal
   552      * documentation.
   553      */
   554     volatile int queueBase;
   555 
   556     /**
   557      * Index (mod submission queue length) of next element to add
   558      * in submission queue. Usage is identical to that for
   559      * per-worker queues -- see ForkJoinWorkerThread internal
   560      * documentation.
   561      */
   562     int queueTop;
   563 
   564     /**
   565      * True when shutdown() has been called.
   566      */
   567     volatile boolean shutdown;
   568 
   569     /**
   570      * True if use local fifo, not default lifo, for local polling
   571      * Read by, and replicated by ForkJoinWorkerThreads
   572      */
   573     final boolean locallyFifo;
   574 
   575     /**
   576      * The number of threads in ForkJoinWorkerThreads.helpQuiescePool.
   577      * When non-zero, suppresses automatic shutdown when active
   578      * counts become zero.
   579      */
   580     volatile int quiescerCount;
   581 
   582     /**
   583      * The number of threads blocked in join.
   584      */
   585     volatile int blockedCount;
   586 
   587     /**
   588      * Counter for worker Thread names (unrelated to their poolIndex)
   589      */
   590     private volatile int nextWorkerNumber;
   591 
   592     /**
   593      * The index for the next created worker. Accessed under scanGuard.
   594      */
   595     private int nextWorkerIndex;
   596 
   597     /**
   598      * SeqLock and index masking for updates to workers array.  Locked
   599      * when SG_UNIT is set. Unlocking clears bit by adding
   600      * SG_UNIT. Staleness of read-only operations can be checked by
   601      * comparing scanGuard to value before the reads. The low 16 bits
   602      * (i.e, anding with SMASK) hold (the smallest power of two
   603      * covering all worker indices, minus one, and is used to avoid
   604      * dealing with large numbers of null slots when the workers array
   605      * is overallocated.
   606      */
   607     volatile int scanGuard;
   608 
   609     private static final int SG_UNIT = 1 << 16;
   610 
   611     /**
   612      * The wakeup interval (in nanoseconds) for a worker waiting for a
   613      * task when the pool is quiescent to instead try to shrink the
   614      * number of workers.  The exact value does not matter too
   615      * much. It must be short enough to release resources during
   616      * sustained periods of idleness, but not so short that threads
   617      * are continually re-created.
   618      */
   619     private static final long SHRINK_RATE =
   620         4L * 1000L * 1000L * 1000L; // 4 seconds
   621 
   622     /**
   623      * Top-level loop for worker threads: On each step: if the
   624      * previous step swept through all queues and found no tasks, or
   625      * there are excess threads, then possibly blocks. Otherwise,
   626      * scans for and, if found, executes a task. Returns when pool
   627      * and/or worker terminate.
   628      *
   629      * @param w the worker
   630      */
   631     final void work(ForkJoinWorkerThread w) {
   632         boolean swept = false;                // true on empty scans
   633         long c;
   634         while (!w.terminate && (int)(c = ctl) >= 0) {
   635             int a;                            // active count
   636             if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
   637                 swept = scan(w, a);
   638             else if (tryAwaitWork(w, c))
   639                 swept = false;
   640         }
   641     }
   642 
   643     // Signalling
   644 
   645     /**
   646      * Wakes up or creates a worker.
   647      */
   648     final void signalWork() {
   649         /*
   650          * The while condition is true if: (there is are too few total
   651          * workers OR there is at least one waiter) AND (there are too
   652          * few active workers OR the pool is terminating).  The value
   653          * of e distinguishes the remaining cases: zero (no waiters)
   654          * for create, negative if terminating (in which case do
   655          * nothing), else release a waiter. The secondary checks for
   656          * release (non-null array etc) can fail if the pool begins
   657          * terminating after the test, and don't impose any added cost
   658          * because JVMs must perform null and bounds checks anyway.
   659          */
   660         long c; int e, u;
   661         while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
   662                 (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
   663             if (e > 0) {                         // release a waiting worker
   664                 int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
   665                 if ((ws = workers) == null ||
   666                     (i = ~e & SMASK) >= ws.length ||
   667                     (w = ws[i]) == null)
   668                     break;
   669                 long nc = (((long)(w.nextWait & E_MASK)) |
   670                            ((long)(u + UAC_UNIT) << 32));
   671                 if (w.eventCount == e &&
   672                     UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   673                     w.eventCount = (e + EC_UNIT) & E_MASK;
   674                     if (w.parked)
   675                         UNSAFE.unpark(w);
   676                     break;
   677                 }
   678             }
   679             else if (UNSAFE.compareAndSwapLong
   680                      (this, ctlOffset, c,
   681                       (long)(((u + UTC_UNIT) & UTC_MASK) |
   682                              ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
   683                 addWorker();
   684                 break;
   685             }
   686         }
   687     }
   688 
   689     /**
   690      * Variant of signalWork to help release waiters on rescans.
   691      * Tries once to release a waiter if active count < 0.
   692      *
   693      * @return false if failed due to contention, else true
   694      */
   695     private boolean tryReleaseWaiter() {
   696         long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
   697         if ((e = (int)(c = ctl)) > 0 &&
   698             (int)(c >> AC_SHIFT) < 0 &&
   699             (ws = workers) != null &&
   700             (i = ~e & SMASK) < ws.length &&
   701             (w = ws[i]) != null) {
   702             long nc = ((long)(w.nextWait & E_MASK) |
   703                        ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
   704             if (w.eventCount != e ||
   705                 !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
   706                 return false;
   707             w.eventCount = (e + EC_UNIT) & E_MASK;
   708             if (w.parked)
   709                 UNSAFE.unpark(w);
   710         }
   711         return true;
   712     }
   713 
   714     // Scanning for tasks
   715 
   716     /**
   717      * Scans for and, if found, executes one task. Scans start at a
   718      * random index of workers array, and randomly select the first
   719      * (2*#workers)-1 probes, and then, if all empty, resort to 2
   720      * circular sweeps, which is necessary to check quiescence. and
   721      * taking a submission only if no stealable tasks were found.  The
   722      * steal code inside the loop is a specialized form of
   723      * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
   724      * helpJoinTask and signal propagation. The code for submission
   725      * queues is almost identical. On each steal, the worker completes
   726      * not only the task, but also all local tasks that this task may
   727      * have generated. On detecting staleness or contention when
   728      * trying to take a task, this method returns without finishing
   729      * sweep, which allows global state rechecks before retry.
   730      *
   731      * @param w the worker
   732      * @param a the number of active workers
   733      * @return true if swept all queues without finding a task
   734      */
   735     private boolean scan(ForkJoinWorkerThread w, int a) {
   736         int g = scanGuard; // mask 0 avoids useless scans if only one active
   737         int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
   738         ForkJoinWorkerThread[] ws = workers;
   739         if (ws == null || ws.length <= m)         // staleness check
   740             return false;
   741         for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
   742             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   743             ForkJoinWorkerThread v = ws[k & m];
   744             if (v != null && (b = v.queueBase) != v.queueTop &&
   745                 (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
   746                 long u = (i << ASHIFT) + ABASE;
   747                 if ((t = q[i]) != null && v.queueBase == b &&
   748                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
   749                     int d = (v.queueBase = b + 1) - v.queueTop;
   750                     v.stealHint = w.poolIndex;
   751                     if (d != 0)
   752                         signalWork();             // propagate if nonempty
   753                     w.execTask(t);
   754                 }
   755                 r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
   756                 return false;                     // store next seed
   757             }
   758             else if (j < 0) {                     // xorshift
   759                 r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
   760             }
   761             else
   762                 ++k;
   763         }
   764         if (scanGuard != g)                       // staleness check
   765             return false;
   766         else {                                    // try to take submission
   767             ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   768             if ((b = queueBase) != queueTop &&
   769                 (q = submissionQueue) != null &&
   770                 (i = (q.length - 1) & b) >= 0) {
   771                 long u = (i << ASHIFT) + ABASE;
   772                 if ((t = q[i]) != null && queueBase == b &&
   773                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
   774                     queueBase = b + 1;
   775                     w.execTask(t);
   776                 }
   777                 return false;
   778             }
   779             return true;                         // all queues empty
   780         }
   781     }
   782 
   783     /**
   784      * Tries to enqueue worker w in wait queue and await change in
   785      * worker's eventCount.  If the pool is quiescent and there is
   786      * more than one worker, possibly terminates worker upon exit.
   787      * Otherwise, before blocking, rescans queues to avoid missed
   788      * signals.  Upon finding work, releases at least one worker
   789      * (which may be the current worker). Rescans restart upon
   790      * detected staleness or failure to release due to
   791      * contention. Note the unusual conventions about Thread.interrupt
   792      * here and elsewhere: Because interrupts are used solely to alert
   793      * threads to check termination, which is checked here anyway, we
   794      * clear status (using Thread.interrupted) before any call to
   795      * park, so that park does not immediately return due to status
   796      * being set via some other unrelated call to interrupt in user
   797      * code.
   798      *
   799      * @param w the calling worker
   800      * @param c the ctl value on entry
   801      * @return true if waited or another thread was released upon enq
   802      */
   803     private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
   804         int v = w.eventCount;
   805         w.nextWait = (int)c;                      // w's successor record
   806         long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   807         if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   808             long d = ctl; // return true if lost to a deq, to force scan
   809             return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
   810         }
   811         for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
   812             long s = stealCount;
   813             if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
   814                 sc = w.stealCount = 0;
   815             else if (w.eventCount != v)
   816                 return true;                      // update next time
   817         }
   818         if ((!shutdown || !tryTerminate(false)) &&
   819             (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
   820             blockedCount == 0 && quiescerCount == 0)
   821             idleAwaitWork(w, nc, c, v);           // quiescent
   822         for (boolean rescanned = false;;) {
   823             if (w.eventCount != v)
   824                 return true;
   825             if (!rescanned) {
   826                 int g = scanGuard, m = g & SMASK;
   827                 ForkJoinWorkerThread[] ws = workers;
   828                 if (ws != null && m < ws.length) {
   829                     rescanned = true;
   830                     for (int i = 0; i <= m; ++i) {
   831                         ForkJoinWorkerThread u = ws[i];
   832                         if (u != null) {
   833                             if (u.queueBase != u.queueTop &&
   834                                 !tryReleaseWaiter())
   835                                 rescanned = false; // contended
   836                             if (w.eventCount != v)
   837                                 return true;
   838                         }
   839                     }
   840                 }
   841                 if (scanGuard != g ||              // stale
   842                     (queueBase != queueTop && !tryReleaseWaiter()))
   843                     rescanned = false;
   844                 if (!rescanned)
   845                     Thread.yield();                // reduce contention
   846                 else
   847                     Thread.interrupted();          // clear before park
   848             }
   849             else {
   850                 w.parked = true;                   // must recheck
   851                 if (w.eventCount != v) {
   852                     w.parked = false;
   853                     return true;
   854                 }
   855                 LockSupport.park(this);
   856                 rescanned = w.parked = false;
   857             }
   858         }
   859     }
   860 
   861     /**
   862      * If inactivating worker w has caused pool to become
   863      * quiescent, check for pool termination, and wait for event
   864      * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
   865      * this case because quiescence reflects consensus about lack
   866      * of work). On timeout, if ctl has not changed, terminate the
   867      * worker. Upon its termination (see deregisterWorker), it may
   868      * wake up another worker to possibly repeat this process.
   869      *
   870      * @param w the calling worker
   871      * @param currentCtl the ctl value after enqueuing w
   872      * @param prevCtl the ctl value if w terminated
   873      * @param v the eventCount w awaits change
   874      */
   875     private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
   876                                long prevCtl, int v) {
   877         if (w.eventCount == v) {
   878             if (shutdown)
   879                 tryTerminate(false);
   880             ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
   881             while (ctl == currentCtl) {
   882                 long startTime = System.nanoTime();
   883                 w.parked = true;
   884                 if (w.eventCount == v)             // must recheck
   885                     LockSupport.parkNanos(this, SHRINK_RATE);
   886                 w.parked = false;
   887                 if (w.eventCount != v)
   888                     break;
   889                 else if (System.nanoTime() - startTime <
   890                          SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
   891                     Thread.interrupted();          // spurious wakeup
   892                 else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
   893                                                    currentCtl, prevCtl)) {
   894                     w.terminate = true;            // restore previous
   895                     w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
   896                     break;
   897                 }
   898             }
   899         }
   900     }
   901 
   902     // Submissions
   903 
   904     /**
   905      * Enqueues the given task in the submissionQueue.  Same idea as
   906      * ForkJoinWorkerThread.pushTask except for use of submissionLock.
   907      *
   908      * @param t the task
   909      */
   910     private void addSubmission(ForkJoinTask<?> t) {
   911         final ReentrantLock lock = this.submissionLock;
   912         lock.lock();
   913         try {
   914             ForkJoinTask<?>[] q; int s, m;
   915             if ((q = submissionQueue) != null) {    // ignore if queue removed
   916                 long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
   917                 UNSAFE.putOrderedObject(q, u, t);
   918                 queueTop = s + 1;
   919                 if (s - queueBase == m)
   920                     growSubmissionQueue();
   921             }
   922         } finally {
   923             lock.unlock();
   924         }
   925         signalWork();
   926     }
   927 
   928     //  (pollSubmission is defined below with exported methods)
   929 
   930     /**
   931      * Creates or doubles submissionQueue array.
   932      * Basically identical to ForkJoinWorkerThread version.
   933      */
   934     private void growSubmissionQueue() {
   935         ForkJoinTask<?>[] oldQ = submissionQueue;
   936         int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
   937         if (size > MAXIMUM_QUEUE_CAPACITY)
   938             throw new RejectedExecutionException("Queue capacity exceeded");
   939         if (size < INITIAL_QUEUE_CAPACITY)
   940             size = INITIAL_QUEUE_CAPACITY;
   941         ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
   942         int mask = size - 1;
   943         int top = queueTop;
   944         int oldMask;
   945         if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
   946             for (int b = queueBase; b != top; ++b) {
   947                 long u = ((b & oldMask) << ASHIFT) + ABASE;
   948                 Object x = UNSAFE.getObjectVolatile(oldQ, u);
   949                 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
   950                     UNSAFE.putObjectVolatile
   951                         (q, ((b & mask) << ASHIFT) + ABASE, x);
   952             }
   953         }
   954     }
   955 
   956     // Blocking support
   957 
   958     /**
   959      * Tries to increment blockedCount, decrement active count
   960      * (sometimes implicitly) and possibly release or create a
   961      * compensating worker in preparation for blocking. Fails
   962      * on contention or termination.
   963      *
   964      * @return true if the caller can block, else should recheck and retry
   965      */
   966     private boolean tryPreBlock() {
   967         int b = blockedCount;
   968         if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
   969             int pc = parallelism;
   970             do {
   971                 ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
   972                 int e, ac, tc, rc, i;
   973                 long c = ctl;
   974                 int u = (int)(c >>> 32);
   975                 if ((e = (int)c) < 0) {
   976                                                  // skip -- terminating
   977                 }
   978                 else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
   979                          (ws = workers) != null &&
   980                          (i = ~e & SMASK) < ws.length &&
   981                          (w = ws[i]) != null) {
   982                     long nc = ((long)(w.nextWait & E_MASK) |
   983                                (c & (AC_MASK|TC_MASK)));
   984                     if (w.eventCount == e &&
   985                         UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   986                         w.eventCount = (e + EC_UNIT) & E_MASK;
   987                         if (w.parked)
   988                             UNSAFE.unpark(w);
   989                         return true;             // release an idle worker
   990                     }
   991                 }
   992                 else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
   993                     long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
   994                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
   995                         return true;             // no compensation needed
   996                 }
   997                 else if (tc + pc < MAX_ID) {
   998                     long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
   999                     if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
  1000                         addWorker();
  1001                         return true;            // create a replacement
  1002                     }
  1003                 }
  1004                 // try to back out on any failure and let caller retry
  1005             } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
  1006                                                b = blockedCount, b - 1));
  1007         }
  1008         return false;
  1009     }
  1010 
  1011     /**
  1012      * Decrements blockedCount and increments active count
  1013      */
  1014     private void postBlock() {
  1015         long c;
  1016         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
  1017                                                 c = ctl, c + AC_UNIT));
  1018         int b;
  1019         do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
  1020                                                b = blockedCount, b - 1));
  1021     }
  1022 
  1023     /**
  1024      * Possibly blocks waiting for the given task to complete, or
  1025      * cancels the task if terminating.  Fails to wait if contended.
  1026      *
  1027      * @param joinMe the task
  1028      */
  1029     final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
  1030         int s;
  1031         Thread.interrupted(); // clear interrupts before checking termination
  1032         if (joinMe.status >= 0) {
  1033             if (tryPreBlock()) {
  1034                 joinMe.tryAwaitDone(0L);
  1035                 postBlock();
  1036             }
  1037             else if ((ctl & STOP_BIT) != 0L)
  1038                 joinMe.cancelIgnoringExceptions();
  1039         }
  1040     }
  1041 
  1042     /**
  1043      * Possibly blocks the given worker waiting for joinMe to
  1044      * complete or timeout
  1045      *
  1046      * @param joinMe the task
  1047      * @param millis the wait time for underlying Object.wait
  1048      */
  1049     final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
  1050         while (joinMe.status >= 0) {
  1051             Thread.interrupted();
  1052             if ((ctl & STOP_BIT) != 0L) {
  1053                 joinMe.cancelIgnoringExceptions();
  1054                 break;
  1055             }
  1056             if (tryPreBlock()) {
  1057                 long last = System.nanoTime();
  1058                 while (joinMe.status >= 0) {
  1059                     long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
  1060                     if (millis <= 0)
  1061                         break;
  1062                     joinMe.tryAwaitDone(millis);
  1063                     if (joinMe.status < 0)
  1064                         break;
  1065                     if ((ctl & STOP_BIT) != 0L) {
  1066                         joinMe.cancelIgnoringExceptions();
  1067                         break;
  1068                     }
  1069                     long now = System.nanoTime();
  1070                     nanos -= now - last;
  1071                     last = now;
  1072                 }
  1073                 postBlock();
  1074                 break;
  1075             }
  1076         }
  1077     }
  1078 
  1079     /**
  1080      * If necessary, compensates for blocker, and blocks
  1081      */
  1082     private void awaitBlocker(ManagedBlocker blocker)
  1083         throws InterruptedException {
  1084         while (!blocker.isReleasable()) {
  1085             if (tryPreBlock()) {
  1086                 try {
  1087                     do {} while (!blocker.isReleasable() && !blocker.block());
  1088                 } finally {
  1089                     postBlock();
  1090                 }
  1091                 break;
  1092             }
  1093         }
  1094     }
  1095 
  1096     // Creating, registering and deregistring workers
  1097 
  1098     /**
  1099      * Tries to create and start a worker; minimally rolls back counts
  1100      * on failure.
  1101      */
  1102     private void addWorker() {
  1103         Throwable ex = null;
  1104         ForkJoinWorkerThread t = null;
  1105         try {
  1106             t = factory.newThread(this);
  1107         } catch (Throwable e) {
  1108             ex = e;
  1109         }
  1110         if (t == null) {  // null or exceptional factory return
  1111             long c;       // adjust counts
  1112             do {} while (!UNSAFE.compareAndSwapLong
  1113                          (this, ctlOffset, c = ctl,
  1114                           (((c - AC_UNIT) & AC_MASK) |
  1115                            ((c - TC_UNIT) & TC_MASK) |
  1116                            (c & ~(AC_MASK|TC_MASK)))));
  1117             // Propagate exception if originating from an external caller
  1118             if (!tryTerminate(false) && ex != null &&
  1119                 !(Thread.currentThread() instanceof ForkJoinWorkerThread))
  1120                 UNSAFE.throwException(ex);
  1121         }
  1122         else
  1123             t.start();
  1124     }
  1125 
  1126     /**
  1127      * Callback from ForkJoinWorkerThread constructor to assign a
  1128      * public name
  1129      */
  1130     final String nextWorkerName() {
  1131         for (int n;;) {
  1132             if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
  1133                                          n = nextWorkerNumber, ++n))
  1134                 return workerNamePrefix + n;
  1135         }
  1136     }
  1137 
  1138     /**
  1139      * Callback from ForkJoinWorkerThread constructor to
  1140      * determine its poolIndex and record in workers array.
  1141      *
  1142      * @param w the worker
  1143      * @return the worker's pool index
  1144      */
  1145     final int registerWorker(ForkJoinWorkerThread w) {
  1146         /*
  1147          * In the typical case, a new worker acquires the lock, uses
  1148          * next available index and returns quickly.  Since we should
  1149          * not block callers (ultimately from signalWork or
  1150          * tryPreBlock) waiting for the lock needed to do this, we
  1151          * instead help release other workers while waiting for the
  1152          * lock.
  1153          */
  1154         for (int g;;) {
  1155             ForkJoinWorkerThread[] ws;
  1156             if (((g = scanGuard) & SG_UNIT) == 0 &&
  1157                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
  1158                                          g, g | SG_UNIT)) {
  1159                 int k = nextWorkerIndex;
  1160                 try {
  1161                     if ((ws = workers) != null) { // ignore on shutdown
  1162                         int n = ws.length;
  1163                         if (k < 0 || k >= n || ws[k] != null) {
  1164                             for (k = 0; k < n && ws[k] != null; ++k)
  1165                                 ;
  1166                             if (k == n)
  1167                                 ws = workers = Arrays.copyOf(ws, n << 1);
  1168                         }
  1169                         ws[k] = w;
  1170                         nextWorkerIndex = k + 1;
  1171                         int m = g & SMASK;
  1172                         g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
  1173                     }
  1174                 } finally {
  1175                     scanGuard = g;
  1176                 }
  1177                 return k;
  1178             }
  1179             else if ((ws = workers) != null) { // help release others
  1180                 for (ForkJoinWorkerThread u : ws) {
  1181                     if (u != null && u.queueBase != u.queueTop) {
  1182                         if (tryReleaseWaiter())
  1183                             break;
  1184                     }
  1185                 }
  1186             }
  1187         }
  1188     }
  1189 
  1190     /**
  1191      * Final callback from terminating worker.  Removes record of
  1192      * worker from array, and adjusts counts. If pool is shutting
  1193      * down, tries to complete termination.
  1194      *
  1195      * @param w the worker
  1196      */
  1197     final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
  1198         int idx = w.poolIndex;
  1199         int sc = w.stealCount;
  1200         int steps = 0;
  1201         // Remove from array, adjust worker counts and collect steal count.
  1202         // We can intermix failed removes or adjusts with steal updates
  1203         do {
  1204             long s, c;
  1205             int g;
  1206             if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
  1207                 UNSAFE.compareAndSwapInt(this, scanGuardOffset,
  1208                                          g, g |= SG_UNIT)) {
  1209                 ForkJoinWorkerThread[] ws = workers;
  1210                 if (ws != null && idx >= 0 &&
  1211                     idx < ws.length && ws[idx] == w)
  1212                     ws[idx] = null;    // verify
  1213                 nextWorkerIndex = idx;
  1214                 scanGuard = g + SG_UNIT;
  1215                 steps = 1;
  1216             }
  1217             if (steps == 1 &&
  1218                 UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
  1219                                           (((c - AC_UNIT) & AC_MASK) |
  1220                                            ((c - TC_UNIT) & TC_MASK) |
  1221                                            (c & ~(AC_MASK|TC_MASK)))))
  1222                 steps = 2;
  1223             if (sc != 0 &&
  1224                 UNSAFE.compareAndSwapLong(this, stealCountOffset,
  1225                                           s = stealCount, s + sc))
  1226                 sc = 0;
  1227         } while (steps != 2 || sc != 0);
  1228         if (!tryTerminate(false)) {
  1229             if (ex != null)   // possibly replace if died abnormally
  1230                 signalWork();
  1231             else
  1232                 tryReleaseWaiter();
  1233         }
  1234     }
  1235 
  1236     // Shutdown and termination
  1237 
  1238     /**
  1239      * Possibly initiates and/or completes termination.
  1240      *
  1241      * @param now if true, unconditionally terminate, else only
  1242      * if shutdown and empty queue and no active workers
  1243      * @return true if now terminating or terminated
  1244      */
  1245     private boolean tryTerminate(boolean now) {
  1246         long c;
  1247         while (((c = ctl) & STOP_BIT) == 0) {
  1248             if (!now) {
  1249                 if ((int)(c >> AC_SHIFT) != -parallelism)
  1250                     return false;
  1251                 if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
  1252                     queueBase != queueTop) {
  1253                     if (ctl == c) // staleness check
  1254                         return false;
  1255                     continue;
  1256                 }
  1257             }
  1258             if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
  1259                 startTerminating();
  1260         }
  1261         if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
  1262             final ReentrantLock lock = this.submissionLock;
  1263             lock.lock();
  1264             try {
  1265                 termination.signalAll();
  1266             } finally {
  1267                 lock.unlock();
  1268             }
  1269         }
  1270         return true;
  1271     }
  1272 
  1273     /**
  1274      * Runs up to three passes through workers: (0) Setting
  1275      * termination status for each worker, followed by wakeups up to
  1276      * queued workers; (1) helping cancel tasks; (2) interrupting
  1277      * lagging threads (likely in external tasks, but possibly also
  1278      * blocked in joins).  Each pass repeats previous steps because of
  1279      * potential lagging thread creation.
  1280      */
  1281     private void startTerminating() {
  1282         cancelSubmissions();
  1283         for (int pass = 0; pass < 3; ++pass) {
  1284             ForkJoinWorkerThread[] ws = workers;
  1285             if (ws != null) {
  1286                 for (ForkJoinWorkerThread w : ws) {
  1287                     if (w != null) {
  1288                         w.terminate = true;
  1289                         if (pass > 0) {
  1290                             w.cancelTasks();
  1291                             if (pass > 1 && !w.isInterrupted()) {
  1292                                 try {
  1293                                     w.interrupt();
  1294                                 } catch (SecurityException ignore) {
  1295                                 }
  1296                             }
  1297                         }
  1298                     }
  1299                 }
  1300                 terminateWaiters();
  1301             }
  1302         }
  1303     }
  1304 
  1305     /**
  1306      * Polls and cancels all submissions. Called only during termination.
  1307      */
  1308     private void cancelSubmissions() {
  1309         while (queueBase != queueTop) {
  1310             ForkJoinTask<?> task = pollSubmission();
  1311             if (task != null) {
  1312                 try {
  1313                     task.cancel(false);
  1314                 } catch (Throwable ignore) {
  1315                 }
  1316             }
  1317         }
  1318     }
  1319 
  1320     /**
  1321      * Tries to set the termination status of waiting workers, and
  1322      * then wakes them up (after which they will terminate).
  1323      */
  1324     private void terminateWaiters() {
  1325         ForkJoinWorkerThread[] ws = workers;
  1326         if (ws != null) {
  1327             ForkJoinWorkerThread w; long c; int i, e;
  1328             int n = ws.length;
  1329             while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
  1330                    (w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
  1331                 if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
  1332                                               (long)(w.nextWait & E_MASK) |
  1333                                               ((c + AC_UNIT) & AC_MASK) |
  1334                                               (c & (TC_MASK|STOP_BIT)))) {
  1335                     w.terminate = true;
  1336                     w.eventCount = e + EC_UNIT;
  1337                     if (w.parked)
  1338                         UNSAFE.unpark(w);
  1339                 }
  1340             }
  1341         }
  1342     }
  1343 
  1344     // misc ForkJoinWorkerThread support
  1345 
  1346     /**
  1347      * Increment or decrement quiescerCount. Needed only to prevent
  1348      * triggering shutdown if a worker is transiently inactive while
  1349      * checking quiescence.
  1350      *
  1351      * @param delta 1 for increment, -1 for decrement
  1352      */
  1353     final void addQuiescerCount(int delta) {
  1354         int c;
  1355         do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
  1356                                                c = quiescerCount, c + delta));
  1357     }
  1358 
  1359     /**
  1360      * Directly increment or decrement active count without
  1361      * queuing. This method is used to transiently assert inactivation
  1362      * while checking quiescence.
  1363      *
  1364      * @param delta 1 for increment, -1 for decrement
  1365      */
  1366     final void addActiveCount(int delta) {
  1367         long d = delta < 0 ? -AC_UNIT : AC_UNIT;
  1368         long c;
  1369         do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
  1370                                                 ((c + d) & AC_MASK) |
  1371                                                 (c & ~AC_MASK)));
  1372     }
  1373 
  1374     /**
  1375      * Returns the approximate (non-atomic) number of idle threads per
  1376      * active thread.
  1377      */
  1378     final int idlePerActive() {
  1379         // Approximate at powers of two for small values, saturate past 4
  1380         int p = parallelism;
  1381         int a = p + (int)(ctl >> AC_SHIFT);
  1382         return (a > (p >>>= 1) ? 0 :
  1383                 a > (p >>>= 1) ? 1 :
  1384                 a > (p >>>= 1) ? 2 :
  1385                 a > (p >>>= 1) ? 4 :
  1386                 8);
  1387     }
  1388 
  1389     // Exported methods
  1390 
  1391     // Constructors
  1392 
  1393     /**
  1394      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
  1395      * java.lang.Runtime#availableProcessors}, using the {@linkplain
  1396      * #defaultForkJoinWorkerThreadFactory default thread factory},
  1397      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  1398      *
  1399      * @throws SecurityException if a security manager exists and
  1400      *         the caller is not permitted to modify threads
  1401      *         because it does not hold {@link
  1402      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1403      */
  1404     public ForkJoinPool() {
  1405         this(1,
  1406              defaultForkJoinWorkerThreadFactory, null, false);
  1407     }
  1408 
  1409     /**
  1410      * Creates a {@code ForkJoinPool} with the indicated parallelism
  1411      * level, the {@linkplain
  1412      * #defaultForkJoinWorkerThreadFactory default thread factory},
  1413      * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  1414      *
  1415      * @param parallelism the parallelism level
  1416      * @throws IllegalArgumentException if parallelism less than or
  1417      *         equal to zero, or greater than implementation limit
  1418      * @throws SecurityException if a security manager exists and
  1419      *         the caller is not permitted to modify threads
  1420      *         because it does not hold {@link
  1421      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1422      */
  1423     public ForkJoinPool(int parallelism) {
  1424         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
  1425     }
  1426 
  1427     /**
  1428      * Creates a {@code ForkJoinPool} with the given parameters.
  1429      *
  1430      * @param parallelism the parallelism level. For default value,
  1431      * use {@link java.lang.Runtime#availableProcessors}.
  1432      * @param factory the factory for creating new threads. For default value,
  1433      * use {@link #defaultForkJoinWorkerThreadFactory}.
  1434      * @param handler the handler for internal worker threads that
  1435      * terminate due to unrecoverable errors encountered while executing
  1436      * tasks. For default value, use {@code null}.
  1437      * @param asyncMode if true,
  1438      * establishes local first-in-first-out scheduling mode for forked
  1439      * tasks that are never joined. This mode may be more appropriate
  1440      * than default locally stack-based mode in applications in which
  1441      * worker threads only process event-style asynchronous tasks.
  1442      * For default value, use {@code false}.
  1443      * @throws IllegalArgumentException if parallelism less than or
  1444      *         equal to zero, or greater than implementation limit
  1445      * @throws NullPointerException if the factory is null
  1446      * @throws SecurityException if a security manager exists and
  1447      *         the caller is not permitted to modify threads
  1448      *         because it does not hold {@link
  1449      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1450      */
  1451     public ForkJoinPool(int parallelism,
  1452                         ForkJoinWorkerThreadFactory factory,
  1453                         Thread.UncaughtExceptionHandler handler,
  1454                         boolean asyncMode) {
  1455         checkPermission();
  1456         if (factory == null)
  1457             throw new NullPointerException();
  1458         if (parallelism <= 0 || parallelism > MAX_ID)
  1459             throw new IllegalArgumentException();
  1460         this.parallelism = parallelism;
  1461         this.factory = factory;
  1462         this.ueh = handler;
  1463         this.locallyFifo = asyncMode;
  1464         long np = (long)(-parallelism); // offset ctl counts
  1465         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  1466         this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
  1467         // initialize workers array with room for 2*parallelism if possible
  1468         int n = parallelism << 1;
  1469         if (n >= MAX_ID)
  1470             n = MAX_ID;
  1471         else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
  1472             n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
  1473         }
  1474         workers = new ForkJoinWorkerThread[n + 1];
  1475         this.submissionLock = new ReentrantLock();
  1476         this.termination = submissionLock.newCondition();
  1477         StringBuilder sb = new StringBuilder("ForkJoinPool-");
  1478         sb.append(poolNumberGenerator.incrementAndGet());
  1479         sb.append("-worker-");
  1480         this.workerNamePrefix = sb.toString();
  1481     }
  1482 
  1483     // Execution methods
  1484 
  1485     /**
  1486      * Performs the given task, returning its result upon completion.
  1487      * If the computation encounters an unchecked Exception or Error,
  1488      * it is rethrown as the outcome of this invocation.  Rethrown
  1489      * exceptions behave in the same way as regular exceptions, but,
  1490      * when possible, contain stack traces (as displayed for example
  1491      * using {@code ex.printStackTrace()}) of both the current thread
  1492      * as well as the thread actually encountering the exception;
  1493      * minimally only the latter.
  1494      *
  1495      * @param task the task
  1496      * @return the task's result
  1497      * @throws NullPointerException if the task is null
  1498      * @throws RejectedExecutionException if the task cannot be
  1499      *         scheduled for execution
  1500      */
  1501     public <T> T invoke(ForkJoinTask<T> task) {
  1502         Thread t = Thread.currentThread();
  1503         if (task == null)
  1504             throw new NullPointerException();
  1505         if (shutdown)
  1506             throw new RejectedExecutionException();
  1507         if ((t instanceof ForkJoinWorkerThread) &&
  1508             ((ForkJoinWorkerThread)t).pool == this)
  1509             return task.invoke();  // bypass submit if in same pool
  1510         else {
  1511             addSubmission(task);
  1512             return task.join();
  1513         }
  1514     }
  1515 
  1516     /**
  1517      * Unless terminating, forks task if within an ongoing FJ
  1518      * computation in the current pool, else submits as external task.
  1519      */
  1520     private <T> void forkOrSubmit(ForkJoinTask<T> task) {
  1521         ForkJoinWorkerThread w;
  1522         Thread t = Thread.currentThread();
  1523         if (shutdown)
  1524             throw new RejectedExecutionException();
  1525         if ((t instanceof ForkJoinWorkerThread) &&
  1526             (w = (ForkJoinWorkerThread)t).pool == this)
  1527             w.pushTask(task);
  1528         else
  1529             addSubmission(task);
  1530     }
  1531 
  1532     /**
  1533      * Arranges for (asynchronous) execution of the given task.
  1534      *
  1535      * @param task the task
  1536      * @throws NullPointerException if the task is null
  1537      * @throws RejectedExecutionException if the task cannot be
  1538      *         scheduled for execution
  1539      */
  1540     public void execute(ForkJoinTask<?> task) {
  1541         if (task == null)
  1542             throw new NullPointerException();
  1543         forkOrSubmit(task);
  1544     }
  1545 
  1546     // AbstractExecutorService methods
  1547 
  1548     /**
  1549      * @throws NullPointerException if the task is null
  1550      * @throws RejectedExecutionException if the task cannot be
  1551      *         scheduled for execution
  1552      */
  1553     public void execute(Runnable task) {
  1554         if (task == null)
  1555             throw new NullPointerException();
  1556         ForkJoinTask<?> job;
  1557         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
  1558             job = (ForkJoinTask<?>) task;
  1559         else
  1560             job = ForkJoinTask.adapt(task, null);
  1561         forkOrSubmit(job);
  1562     }
  1563 
  1564     /**
  1565      * Submits a ForkJoinTask for execution.
  1566      *
  1567      * @param task the task to submit
  1568      * @return the task
  1569      * @throws NullPointerException if the task is null
  1570      * @throws RejectedExecutionException if the task cannot be
  1571      *         scheduled for execution
  1572      */
  1573     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  1574         if (task == null)
  1575             throw new NullPointerException();
  1576         forkOrSubmit(task);
  1577         return task;
  1578     }
  1579 
  1580     /**
  1581      * @throws NullPointerException if the task is null
  1582      * @throws RejectedExecutionException if the task cannot be
  1583      *         scheduled for execution
  1584      */
  1585     public <T> ForkJoinTask<T> submit(Callable<T> task) {
  1586         if (task == null)
  1587             throw new NullPointerException();
  1588         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
  1589         forkOrSubmit(job);
  1590         return job;
  1591     }
  1592 
  1593     /**
  1594      * @throws NullPointerException if the task is null
  1595      * @throws RejectedExecutionException if the task cannot be
  1596      *         scheduled for execution
  1597      */
  1598     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
  1599         if (task == null)
  1600             throw new NullPointerException();
  1601         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
  1602         forkOrSubmit(job);
  1603         return job;
  1604     }
  1605 
  1606     /**
  1607      * @throws NullPointerException if the task is null
  1608      * @throws RejectedExecutionException if the task cannot be
  1609      *         scheduled for execution
  1610      */
  1611     public ForkJoinTask<?> submit(Runnable task) {
  1612         if (task == null)
  1613             throw new NullPointerException();
  1614         ForkJoinTask<?> job;
  1615         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
  1616             job = (ForkJoinTask<?>) task;
  1617         else
  1618             job = ForkJoinTask.adapt(task, null);
  1619         forkOrSubmit(job);
  1620         return job;
  1621     }
  1622 
  1623     /**
  1624      * @throws NullPointerException       {@inheritDoc}
  1625      * @throws RejectedExecutionException {@inheritDoc}
  1626      */
  1627     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
  1628         ArrayList<ForkJoinTask<T>> forkJoinTasks =
  1629             new ArrayList<ForkJoinTask<T>>(tasks.size());
  1630         for (Callable<T> task : tasks)
  1631             forkJoinTasks.add(ForkJoinTask.adapt(task));
  1632         invoke(new InvokeAll<T>(forkJoinTasks));
  1633 
  1634         @SuppressWarnings({"unchecked", "rawtypes"})
  1635             List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
  1636         return futures;
  1637     }
  1638 
  1639     static final class InvokeAll<T> extends RecursiveAction {
  1640         final ArrayList<ForkJoinTask<T>> tasks;
  1641         InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
  1642         public void compute() {
  1643             try { invokeAll(tasks); }
  1644             catch (Exception ignore) {}
  1645         }
  1646         private static final long serialVersionUID = -7914297376763021607L;
  1647     }
  1648 
  1649     /**
  1650      * Returns the factory used for constructing new workers.
  1651      *
  1652      * @return the factory used for constructing new workers
  1653      */
  1654     public ForkJoinWorkerThreadFactory getFactory() {
  1655         return factory;
  1656     }
  1657 
  1658     /**
  1659      * Returns the handler for internal worker threads that terminate
  1660      * due to unrecoverable errors encountered while executing tasks.
  1661      *
  1662      * @return the handler, or {@code null} if none
  1663      */
  1664     public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
  1665         return ueh;
  1666     }
  1667 
  1668     /**
  1669      * Returns the targeted parallelism level of this pool.
  1670      *
  1671      * @return the targeted parallelism level of this pool
  1672      */
  1673     public int getParallelism() {
  1674         return parallelism;
  1675     }
  1676 
  1677     /**
  1678      * Returns the number of worker threads that have started but not
  1679      * yet terminated.  The result returned by this method may differ
  1680      * from {@link #getParallelism} when threads are created to
  1681      * maintain parallelism when others are cooperatively blocked.
  1682      *
  1683      * @return the number of worker threads
  1684      */
  1685     public int getPoolSize() {
  1686         return parallelism + (short)(ctl >>> TC_SHIFT);
  1687     }
  1688 
  1689     /**
  1690      * Returns {@code true} if this pool uses local first-in-first-out
  1691      * scheduling mode for forked tasks that are never joined.
  1692      *
  1693      * @return {@code true} if this pool uses async mode
  1694      */
  1695     public boolean getAsyncMode() {
  1696         return locallyFifo;
  1697     }
  1698 
  1699     /**
  1700      * Returns an estimate of the number of worker threads that are
  1701      * not blocked waiting to join tasks or for other managed
  1702      * synchronization. This method may overestimate the
  1703      * number of running threads.
  1704      *
  1705      * @return the number of worker threads
  1706      */
  1707     public int getRunningThreadCount() {
  1708         int r = parallelism + (int)(ctl >> AC_SHIFT);
  1709         return (r <= 0) ? 0 : r; // suppress momentarily negative values
  1710     }
  1711 
  1712     /**
  1713      * Returns an estimate of the number of threads that are currently
  1714      * stealing or executing tasks. This method may overestimate the
  1715      * number of active threads.
  1716      *
  1717      * @return the number of active threads
  1718      */
  1719     public int getActiveThreadCount() {
  1720         int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
  1721         return (r <= 0) ? 0 : r; // suppress momentarily negative values
  1722     }
  1723 
  1724     /**
  1725      * Returns {@code true} if all worker threads are currently idle.
  1726      * An idle worker is one that cannot obtain a task to execute
  1727      * because none are available to steal from other threads, and
  1728      * there are no pending submissions to the pool. This method is
  1729      * conservative; it might not return {@code true} immediately upon
  1730      * idleness of all threads, but will eventually become true if
  1731      * threads remain inactive.
  1732      *
  1733      * @return {@code true} if all threads are currently idle
  1734      */
  1735     public boolean isQuiescent() {
  1736         return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
  1737     }
  1738 
  1739     /**
  1740      * Returns an estimate of the total number of tasks stolen from
  1741      * one thread's work queue by another. The reported value
  1742      * underestimates the actual total number of steals when the pool
  1743      * is not quiescent. This value may be useful for monitoring and
  1744      * tuning fork/join programs: in general, steal counts should be
  1745      * high enough to keep threads busy, but low enough to avoid
  1746      * overhead and contention across threads.
  1747      *
  1748      * @return the number of steals
  1749      */
  1750     public long getStealCount() {
  1751         return stealCount;
  1752     }
  1753 
  1754     /**
  1755      * Returns an estimate of the total number of tasks currently held
  1756      * in queues by worker threads (but not including tasks submitted
  1757      * to the pool that have not begun executing). This value is only
  1758      * an approximation, obtained by iterating across all threads in
  1759      * the pool. This method may be useful for tuning task
  1760      * granularities.
  1761      *
  1762      * @return the number of queued tasks
  1763      */
  1764     public long getQueuedTaskCount() {
  1765         long count = 0;
  1766         ForkJoinWorkerThread[] ws;
  1767         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
  1768             (ws = workers) != null) {
  1769             for (ForkJoinWorkerThread w : ws)
  1770                 if (w != null)
  1771                     count -= w.queueBase - w.queueTop; // must read base first
  1772         }
  1773         return count;
  1774     }
  1775 
  1776     /**
  1777      * Returns an estimate of the number of tasks submitted to this
  1778      * pool that have not yet begun executing.  This method may take
  1779      * time proportional to the number of submissions.
  1780      *
  1781      * @return the number of queued submissions
  1782      */
  1783     public int getQueuedSubmissionCount() {
  1784         return -queueBase + queueTop;
  1785     }
  1786 
  1787     /**
  1788      * Returns {@code true} if there are any tasks submitted to this
  1789      * pool that have not yet begun executing.
  1790      *
  1791      * @return {@code true} if there are any queued submissions
  1792      */
  1793     public boolean hasQueuedSubmissions() {
  1794         return queueBase != queueTop;
  1795     }
  1796 
  1797     /**
  1798      * Removes and returns the next unexecuted submission if one is
  1799      * available.  This method may be useful in extensions to this
  1800      * class that re-assign work in systems with multiple pools.
  1801      *
  1802      * @return the next submission, or {@code null} if none
  1803      */
  1804     protected ForkJoinTask<?> pollSubmission() {
  1805         ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
  1806         while ((b = queueBase) != queueTop &&
  1807                (q = submissionQueue) != null &&
  1808                (i = (q.length - 1) & b) >= 0) {
  1809             long u = (i << ASHIFT) + ABASE;
  1810             if ((t = q[i]) != null &&
  1811                 queueBase == b &&
  1812                 UNSAFE.compareAndSwapObject(q, u, t, null)) {
  1813                 queueBase = b + 1;
  1814                 return t;
  1815             }
  1816         }
  1817         return null;
  1818     }
  1819 
  1820     /**
  1821      * Removes all available unexecuted submitted and forked tasks
  1822      * from scheduling queues and adds them to the given collection,
  1823      * without altering their execution status. These may include
  1824      * artificially generated or wrapped tasks. This method is
  1825      * designed to be invoked only when the pool is known to be
  1826      * quiescent. Invocations at other times may not remove all
  1827      * tasks. A failure encountered while attempting to add elements
  1828      * to collection {@code c} may result in elements being in
  1829      * neither, either or both collections when the associated
  1830      * exception is thrown.  The behavior of this operation is
  1831      * undefined if the specified collection is modified while the
  1832      * operation is in progress.
  1833      *
  1834      * @param c the collection to transfer elements into
  1835      * @return the number of elements transferred
  1836      */
  1837     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
  1838         int count = 0;
  1839         while (queueBase != queueTop) {
  1840             ForkJoinTask<?> t = pollSubmission();
  1841             if (t != null) {
  1842                 c.add(t);
  1843                 ++count;
  1844             }
  1845         }
  1846         ForkJoinWorkerThread[] ws;
  1847         if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
  1848             (ws = workers) != null) {
  1849             for (ForkJoinWorkerThread w : ws)
  1850                 if (w != null)
  1851                     count += w.drainTasksTo(c);
  1852         }
  1853         return count;
  1854     }
  1855 
  1856     /**
  1857      * Returns a string identifying this pool, as well as its state,
  1858      * including indications of run state, parallelism level, and
  1859      * worker and task counts.
  1860      *
  1861      * @return a string identifying this pool, as well as its state
  1862      */
  1863     public String toString() {
  1864         long st = getStealCount();
  1865         long qt = getQueuedTaskCount();
  1866         long qs = getQueuedSubmissionCount();
  1867         int pc = parallelism;
  1868         long c = ctl;
  1869         int tc = pc + (short)(c >>> TC_SHIFT);
  1870         int rc = pc + (int)(c >> AC_SHIFT);
  1871         if (rc < 0) // ignore transient negative
  1872             rc = 0;
  1873         int ac = rc + blockedCount;
  1874         String level;
  1875         if ((c & STOP_BIT) != 0)
  1876             level = (tc == 0) ? "Terminated" : "Terminating";
  1877         else
  1878             level = shutdown ? "Shutting down" : "Running";
  1879         return super.toString() +
  1880             "[" + level +
  1881             ", parallelism = " + pc +
  1882             ", size = " + tc +
  1883             ", active = " + ac +
  1884             ", running = " + rc +
  1885             ", steals = " + st +
  1886             ", tasks = " + qt +
  1887             ", submissions = " + qs +
  1888             "]";
  1889     }
  1890 
  1891     /**
  1892      * Initiates an orderly shutdown in which previously submitted
  1893      * tasks are executed, but no new tasks will be accepted.
  1894      * Invocation has no additional effect if already shut down.
  1895      * Tasks that are in the process of being submitted concurrently
  1896      * during the course of this method may or may not be rejected.
  1897      *
  1898      * @throws SecurityException if a security manager exists and
  1899      *         the caller is not permitted to modify threads
  1900      *         because it does not hold {@link
  1901      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1902      */
  1903     public void shutdown() {
  1904         checkPermission();
  1905         shutdown = true;
  1906         tryTerminate(false);
  1907     }
  1908 
  1909     /**
  1910      * Attempts to cancel and/or stop all tasks, and reject all
  1911      * subsequently submitted tasks.  Tasks that are in the process of
  1912      * being submitted or executed concurrently during the course of
  1913      * this method may or may not be rejected. This method cancels
  1914      * both existing and unexecuted tasks, in order to permit
  1915      * termination in the presence of task dependencies. So the method
  1916      * always returns an empty list (unlike the case for some other
  1917      * Executors).
  1918      *
  1919      * @return an empty list
  1920      * @throws SecurityException if a security manager exists and
  1921      *         the caller is not permitted to modify threads
  1922      *         because it does not hold {@link
  1923      *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1924      */
  1925     public List<Runnable> shutdownNow() {
  1926         checkPermission();
  1927         shutdown = true;
  1928         tryTerminate(true);
  1929         return Collections.emptyList();
  1930     }
  1931 
  1932     /**
  1933      * Returns {@code true} if all tasks have completed following shut down.
  1934      *
  1935      * @return {@code true} if all tasks have completed following shut down
  1936      */
  1937     public boolean isTerminated() {
  1938         long c = ctl;
  1939         return ((c & STOP_BIT) != 0L &&
  1940                 (short)(c >>> TC_SHIFT) == -parallelism);
  1941     }
  1942 
  1943     /**
  1944      * Returns {@code true} if the process of termination has
  1945      * commenced but not yet completed.  This method may be useful for
  1946      * debugging. A return of {@code true} reported a sufficient
  1947      * period after shutdown may indicate that submitted tasks have
  1948      * ignored or suppressed interruption, or are waiting for IO,
  1949      * causing this executor not to properly terminate. (See the
  1950      * advisory notes for class {@link ForkJoinTask} stating that
  1951      * tasks should not normally entail blocking operations.  But if
  1952      * they do, they must abort them on interrupt.)
  1953      *
  1954      * @return {@code true} if terminating but not yet terminated
  1955      */
  1956     public boolean isTerminating() {
  1957         long c = ctl;
  1958         return ((c & STOP_BIT) != 0L &&
  1959                 (short)(c >>> TC_SHIFT) != -parallelism);
  1960     }
  1961 
  1962     /**
  1963      * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
  1964      */
  1965     final boolean isAtLeastTerminating() {
  1966         return (ctl & STOP_BIT) != 0L;
  1967     }
  1968 
  1969     /**
  1970      * Returns {@code true} if this pool has been shut down.
  1971      *
  1972      * @return {@code true} if this pool has been shut down
  1973      */
  1974     public boolean isShutdown() {
  1975         return shutdown;
  1976     }
  1977 
  1978     /**
  1979      * Blocks until all tasks have completed execution after a shutdown
  1980      * request, or the timeout occurs, or the current thread is
  1981      * interrupted, whichever happens first.
  1982      *
  1983      * @param timeout the maximum time to wait
  1984      * @param unit the time unit of the timeout argument
  1985      * @return {@code true} if this executor terminated and
  1986      *         {@code false} if the timeout elapsed before termination
  1987      * @throws InterruptedException if interrupted while waiting
  1988      */
  1989     public boolean awaitTermination(long timeout, TimeUnit unit)
  1990         throws InterruptedException {
  1991         long nanos = unit.toNanos(timeout);
  1992         final ReentrantLock lock = this.submissionLock;
  1993         lock.lock();
  1994         try {
  1995             for (;;) {
  1996                 if (isTerminated())
  1997                     return true;
  1998                 if (nanos <= 0)
  1999                     return false;
  2000                 nanos = termination.awaitNanos(nanos);
  2001             }
  2002         } finally {
  2003             lock.unlock();
  2004         }
  2005     }
  2006 
  2007     /**
  2008      * Interface for extending managed parallelism for tasks running
  2009      * in {@link ForkJoinPool}s.
  2010      *
  2011      * <p>A {@code ManagedBlocker} provides two methods.  Method
  2012      * {@code isReleasable} must return {@code true} if blocking is
  2013      * not necessary. Method {@code block} blocks the current thread
  2014      * if necessary (perhaps internally invoking {@code isReleasable}
  2015      * before actually blocking). These actions are performed by any
  2016      * thread invoking {@link ForkJoinPool#managedBlock}.  The
  2017      * unusual methods in this API accommodate synchronizers that may,
  2018      * but don't usually, block for long periods. Similarly, they
  2019      * allow more efficient internal handling of cases in which
  2020      * additional workers may be, but usually are not, needed to
  2021      * ensure sufficient parallelism.  Toward this end,
  2022      * implementations of method {@code isReleasable} must be amenable
  2023      * to repeated invocation.
  2024      *
  2025      * <p>For example, here is a ManagedBlocker based on a
  2026      * ReentrantLock:
  2027      *  <pre> {@code
  2028      * class ManagedLocker implements ManagedBlocker {
  2029      *   final ReentrantLock lock;
  2030      *   boolean hasLock = false;
  2031      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
  2032      *   public boolean block() {
  2033      *     if (!hasLock)
  2034      *       lock.lock();
  2035      *     return true;
  2036      *   }
  2037      *   public boolean isReleasable() {
  2038      *     return hasLock || (hasLock = lock.tryLock());
  2039      *   }
  2040      * }}</pre>
  2041      *
  2042      * <p>Here is a class that possibly blocks waiting for an
  2043      * item on a given queue:
  2044      *  <pre> {@code
  2045      * class QueueTaker<E> implements ManagedBlocker {
  2046      *   final BlockingQueue<E> queue;
  2047      *   volatile E item = null;
  2048      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
  2049      *   public boolean block() throws InterruptedException {
  2050      *     if (item == null)
  2051      *       item = queue.take();
  2052      *     return true;
  2053      *   }
  2054      *   public boolean isReleasable() {
  2055      *     return item != null || (item = queue.poll()) != null;
  2056      *   }
  2057      *   public E getItem() { // call after pool.managedBlock completes
  2058      *     return item;
  2059      *   }
  2060      * }}</pre>
  2061      */
  2062     public static interface ManagedBlocker {
  2063         /**
  2064          * Possibly blocks the current thread, for example waiting for
  2065          * a lock or condition.
  2066          *
  2067          * @return {@code true} if no additional blocking is necessary
  2068          * (i.e., if isReleasable would return true)
  2069          * @throws InterruptedException if interrupted while waiting
  2070          * (the method is not required to do so, but is allowed to)
  2071          */
  2072         boolean block() throws InterruptedException;
  2073 
  2074         /**
  2075          * Returns {@code true} if blocking is unnecessary.
  2076          */
  2077         boolean isReleasable();
  2078     }
  2079 
  2080     /**
  2081      * Blocks in accord with the given blocker.  If the current thread
  2082      * is a {@link ForkJoinWorkerThread}, this method possibly
  2083      * arranges for a spare thread to be activated if necessary to
  2084      * ensure sufficient parallelism while the current thread is blocked.
  2085      *
  2086      * <p>If the caller is not a {@link ForkJoinTask}, this method is
  2087      * behaviorally equivalent to
  2088      *  <pre> {@code
  2089      * while (!blocker.isReleasable())
  2090      *   if (blocker.block())
  2091      *     return;
  2092      * }</pre>
  2093      *
  2094      * If the caller is a {@code ForkJoinTask}, then the pool may
  2095      * first be expanded to ensure parallelism, and later adjusted.
  2096      *
  2097      * @param blocker the blocker
  2098      * @throws InterruptedException if blocker.block did so
  2099      */
  2100     public static void managedBlock(ManagedBlocker blocker)
  2101         throws InterruptedException {
  2102         Thread t = Thread.currentThread();
  2103         if (t instanceof ForkJoinWorkerThread) {
  2104             ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
  2105             w.pool.awaitBlocker(blocker);
  2106         }
  2107         else {
  2108             do {} while (!blocker.isReleasable() && !blocker.block());
  2109         }
  2110     }
  2111 
  2112     // AbstractExecutorService overrides.  These rely on undocumented
  2113     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
  2114     // implement RunnableFuture.
  2115 
  2116     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  2117         return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
  2118     }
  2119 
  2120     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  2121         return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
  2122     }
  2123 
  2124     // Unsafe mechanics
  2125     private static final long ctlOffset;
  2126     private static final long stealCountOffset;
  2127     private static final long blockedCountOffset;
  2128     private static final long quiescerCountOffset;
  2129     private static final long scanGuardOffset;
  2130     private static final long nextWorkerNumberOffset;
  2131     private static final long ABASE;
  2132     private static final int ASHIFT;
  2133 
  2134     static {
  2135         poolNumberGenerator = new AtomicInteger();
  2136         workerSeedGenerator = new Random();
  2137         modifyThreadPermission = new RuntimePermission("modifyThread");
  2138         defaultForkJoinWorkerThreadFactory =
  2139             new DefaultForkJoinWorkerThreadFactory();
  2140         int s;
  2141         try {
  2142             UNSAFE = sun.misc.Unsafe.getUnsafe();
  2143             Class k = ForkJoinPool.class;
  2144             ctlOffset = UNSAFE.objectFieldOffset
  2145                 (k.getDeclaredField("ctl"));
  2146             stealCountOffset = UNSAFE.objectFieldOffset
  2147                 (k.getDeclaredField("stealCount"));
  2148             blockedCountOffset = UNSAFE.objectFieldOffset
  2149                 (k.getDeclaredField("blockedCount"));
  2150             quiescerCountOffset = UNSAFE.objectFieldOffset
  2151                 (k.getDeclaredField("quiescerCount"));
  2152             scanGuardOffset = UNSAFE.objectFieldOffset
  2153                 (k.getDeclaredField("scanGuard"));
  2154             nextWorkerNumberOffset = UNSAFE.objectFieldOffset
  2155                 (k.getDeclaredField("nextWorkerNumber"));
  2156             Class a = ForkJoinTask[].class;
  2157             ABASE = UNSAFE.arrayBaseOffset(a);
  2158             s = UNSAFE.arrayIndexScale(a);
  2159         } catch (Exception e) {
  2160             throw new Error(e);
  2161         }
  2162         if ((s & (s-1)) != 0)
  2163             throw new Error("data type scale not a power of two");
  2164         ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
  2165     }
  2166 
  2167 }