rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinPool.java
branchjdk7-b147
changeset 1890 212417b74b72
child 1895 bfaf3300b7ba
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinPool.java	Sat Mar 19 10:46:31 2016 +0100
     1.3 @@ -0,0 +1,2177 @@
     1.4 +/*
     1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     1.6 + *
     1.7 + * This code is free software; you can redistribute it and/or modify it
     1.8 + * under the terms of the GNU General Public License version 2 only, as
     1.9 + * published by the Free Software Foundation.  Oracle designates this
    1.10 + * particular file as subject to the "Classpath" exception as provided
    1.11 + * by Oracle in the LICENSE file that accompanied this code.
    1.12 + *
    1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
    1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    1.15 + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    1.16 + * version 2 for more details (a copy is included in the LICENSE file that
    1.17 + * accompanied this code).
    1.18 + *
    1.19 + * You should have received a copy of the GNU General Public License version
    1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
    1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    1.22 + *
    1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    1.24 + * or visit www.oracle.com if you need additional information or have any
    1.25 + * questions.
    1.26 + */
    1.27 +
    1.28 +/*
    1.29 + * This file is available under and governed by the GNU General Public
    1.30 + * License version 2 only, as published by the Free Software Foundation.
    1.31 + * However, the following notice accompanied the original version of this
    1.32 + * file:
    1.33 + *
    1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
    1.35 + * Expert Group and released to the public domain, as explained at
    1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
    1.37 + */
    1.38 +
    1.39 +package java.util.concurrent;
    1.40 +
    1.41 +import java.util.ArrayList;
    1.42 +import java.util.Arrays;
    1.43 +import java.util.Collection;
    1.44 +import java.util.Collections;
    1.45 +import java.util.List;
    1.46 +import java.util.Random;
    1.47 +import java.util.concurrent.AbstractExecutorService;
    1.48 +import java.util.concurrent.Callable;
    1.49 +import java.util.concurrent.ExecutorService;
    1.50 +import java.util.concurrent.Future;
    1.51 +import java.util.concurrent.RejectedExecutionException;
    1.52 +import java.util.concurrent.RunnableFuture;
    1.53 +import java.util.concurrent.TimeUnit;
    1.54 +import java.util.concurrent.TimeoutException;
    1.55 +import java.util.concurrent.atomic.AtomicInteger;
    1.56 +import java.util.concurrent.locks.LockSupport;
    1.57 +import java.util.concurrent.locks.ReentrantLock;
    1.58 +import java.util.concurrent.locks.Condition;
    1.59 +
    1.60 +/**
    1.61 + * An {@link ExecutorService} for running {@link ForkJoinTask}s.
    1.62 + * A {@code ForkJoinPool} provides the entry point for submissions
    1.63 + * from non-{@code ForkJoinTask} clients, as well as management and
    1.64 + * monitoring operations.
    1.65 + *
    1.66 + * <p>A {@code ForkJoinPool} differs from other kinds of {@link
    1.67 + * ExecutorService} mainly by virtue of employing
    1.68 + * <em>work-stealing</em>: all threads in the pool attempt to find and
    1.69 + * execute subtasks created by other active tasks (eventually blocking
    1.70 + * waiting for work if none exist). This enables efficient processing
    1.71 + * when most tasks spawn other subtasks (as do most {@code
    1.72 + * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
    1.73 + * constructors, {@code ForkJoinPool}s may also be appropriate for use
    1.74 + * with event-style tasks that are never joined.
    1.75 + *
    1.76 + * <p>A {@code ForkJoinPool} is constructed with a given target
    1.77 + * parallelism level; by default, equal to the number of available
    1.78 + * processors. The pool attempts to maintain enough active (or
    1.79 + * available) threads by dynamically adding, suspending, or resuming
    1.80 + * internal worker threads, even if some tasks are stalled waiting to
    1.81 + * join others. However, no such adjustments are guaranteed in the
    1.82 + * face of blocked IO or other unmanaged synchronization. The nested
    1.83 + * {@link ManagedBlocker} interface enables extension of the kinds of
    1.84 + * synchronization accommodated.
    1.85 + *
    1.86 + * <p>In addition to execution and lifecycle control methods, this
    1.87 + * class provides status check methods (for example
    1.88 + * {@link #getStealCount}) that are intended to aid in developing,
    1.89 + * tuning, and monitoring fork/join applications. Also, method
    1.90 + * {@link #toString} returns indications of pool state in a
    1.91 + * convenient form for informal monitoring.
    1.92 + *
    1.93 + * <p> As is the case with other ExecutorServices, there are three
    1.94 + * main task execution methods summarized in the following
    1.95 + * table. These are designed to be used by clients not already engaged
    1.96 + * in fork/join computations in the current pool.  The main forms of
    1.97 + * these methods accept instances of {@code ForkJoinTask}, but
    1.98 + * overloaded forms also allow mixed execution of plain {@code
    1.99 + * Runnable}- or {@code Callable}- based activities as well.  However,
   1.100 + * tasks that are already executing in a pool should normally
   1.101 + * <em>NOT</em> use these pool execution methods, but instead use the
   1.102 + * within-computation forms listed in the table.
   1.103 + *
   1.104 + * <table BORDER CELLPADDING=3 CELLSPACING=1>
   1.105 + *  <tr>
   1.106 + *    <td></td>
   1.107 + *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
   1.108 + *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
   1.109 + *  </tr>
   1.110 + *  <tr>
   1.111 + *    <td> <b>Arrange async execution</td>
   1.112 + *    <td> {@link #execute(ForkJoinTask)}</td>
   1.113 + *    <td> {@link ForkJoinTask#fork}</td>
   1.114 + *  </tr>
   1.115 + *  <tr>
   1.116 + *    <td> <b>Await and obtain result</td>
   1.117 + *    <td> {@link #invoke(ForkJoinTask)}</td>
   1.118 + *    <td> {@link ForkJoinTask#invoke}</td>
   1.119 + *  </tr>
   1.120 + *  <tr>
   1.121 + *    <td> <b>Arrange exec and obtain Future</td>
   1.122 + *    <td> {@link #submit(ForkJoinTask)}</td>
   1.123 + *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
   1.124 + *  </tr>
   1.125 + * </table>
   1.126 + *
   1.127 + * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
   1.128 + * used for all parallel task execution in a program or subsystem.
   1.129 + * Otherwise, use would not usually outweigh the construction and
   1.130 + * bookkeeping overhead of creating a large set of threads. For
   1.131 + * example, a common pool could be used for the {@code SortTasks}
   1.132 + * illustrated in {@link RecursiveAction}. Because {@code
   1.133 + * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
   1.134 + * daemon} mode, there is typically no need to explicitly {@link
   1.135 + * #shutdown} such a pool upon program exit.
   1.136 + *
   1.137 + * <pre>
   1.138 + * static final ForkJoinPool mainPool = new ForkJoinPool();
   1.139 + * ...
   1.140 + * public void sort(long[] array) {
   1.141 + *   mainPool.invoke(new SortTask(array, 0, array.length));
   1.142 + * }
   1.143 + * </pre>
   1.144 + *
   1.145 + * <p><b>Implementation notes</b>: This implementation restricts the
   1.146 + * maximum number of running threads to 32767. Attempts to create
   1.147 + * pools with greater than the maximum number result in
   1.148 + * {@code IllegalArgumentException}.
   1.149 + *
   1.150 + * <p>This implementation rejects submitted tasks (that is, by throwing
   1.151 + * {@link RejectedExecutionException}) only when the pool is shut down
   1.152 + * or internal resources have been exhausted.
   1.153 + *
   1.154 + * @since 1.7
   1.155 + * @author Doug Lea
   1.156 + */
   1.157 +public class ForkJoinPool extends AbstractExecutorService {
   1.158 +
   1.159 +    /*
   1.160 +     * Implementation Overview
   1.161 +     *
   1.162 +     * This class provides the central bookkeeping and control for a
   1.163 +     * set of worker threads: Submissions from non-FJ threads enter
   1.164 +     * into a submission queue. Workers take these tasks and typically
   1.165 +     * split them into subtasks that may be stolen by other workers.
   1.166 +     * Preference rules give first priority to processing tasks from
   1.167 +     * their own queues (LIFO or FIFO, depending on mode), then to
   1.168 +     * randomized FIFO steals of tasks in other worker queues, and
   1.169 +     * lastly to new submissions.
   1.170 +     *
   1.171 +     * The main throughput advantages of work-stealing stem from
   1.172 +     * decentralized control -- workers mostly take tasks from
   1.173 +     * themselves or each other. We cannot negate this in the
   1.174 +     * implementation of other management responsibilities. The main
   1.175 +     * tactic for avoiding bottlenecks is packing nearly all
   1.176 +     * essentially atomic control state into a single 64bit volatile
   1.177 +     * variable ("ctl"). This variable is read on the order of 10-100
   1.178 +     * times as often as it is modified (always via CAS). (There is
   1.179 +     * some additional control state, for example variable "shutdown"
   1.180 +     * for which we can cope with uncoordinated updates.)  This
   1.181 +     * streamlines synchronization and control at the expense of messy
   1.182 +     * constructions needed to repack status bits upon updates.
   1.183 +     * Updates tend not to contend with each other except during
   1.184 +     * bursts while submitted tasks begin or end.  In some cases when
   1.185 +     * they do contend, threads can instead do something else
   1.186 +     * (usually, scan for tasks) until contention subsides.
   1.187 +     *
   1.188 +     * To enable packing, we restrict maximum parallelism to (1<<15)-1
   1.189 +     * (which is far in excess of normal operating range) to allow
   1.190 +     * ids, counts, and their negations (used for thresholding) to fit
   1.191 +     * into 16bit fields.
   1.192 +     *
   1.193 +     * Recording Workers.  Workers are recorded in the "workers" array
   1.194 +     * that is created upon pool construction and expanded if (rarely)
   1.195 +     * necessary.  This is an array as opposed to some other data
   1.196 +     * structure to support index-based random steals by workers.
   1.197 +     * Updates to the array recording new workers and unrecording
   1.198 +     * terminated ones are protected from each other by a seqLock
   1.199 +     * (scanGuard) but the array is otherwise concurrently readable,
   1.200 +     * and accessed directly by workers. To simplify index-based
   1.201 +     * operations, the array size is always a power of two, and all
   1.202 +     * readers must tolerate null slots. To avoid flailing during
   1.203 +     * start-up, the array is presized to hold twice #parallelism
   1.204 +     * workers (which is unlikely to need further resizing during
   1.205 +     * execution). But to avoid dealing with so many null slots,
   1.206 +     * variable scanGuard includes a mask for the nearest power of two
   1.207 +     * that contains all current workers.  All worker thread creation
   1.208 +     * is on-demand, triggered by task submissions, replacement of
   1.209 +     * terminated workers, and/or compensation for blocked
   1.210 +     * workers. However, all other support code is set up to work with
   1.211 +     * other policies.  To ensure that we do not hold on to worker
   1.212 +     * references that would prevent GC, ALL accesses to workers are
   1.213 +     * via indices into the workers array (which is one source of some
   1.214 +     * of the messy code constructions here). In essence, the workers
   1.215 +     * array serves as a weak reference mechanism. Thus for example
   1.216 +     * the wait queue field of ctl stores worker indices, not worker
   1.217 +     * references.  Access to the workers in associated methods (for
   1.218 +     * example signalWork) must both index-check and null-check the
   1.219 +     * IDs. All such accesses ignore bad IDs by returning out early
   1.220 +     * from what they are doing, since this can only be associated
   1.221 +     * with termination, in which case it is OK to give up.
   1.222 +     *
   1.223 +     * All uses of the workers array, as well as queue arrays, check
   1.224 +     * that the array is non-null (even if previously non-null). This
   1.225 +     * allows nulling during termination, which is currently not
   1.226 +     * necessary, but remains an option for resource-revocation-based
   1.227 +     * shutdown schemes.
   1.228 +     *
   1.229 +     * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
   1.230 +     * let workers spin indefinitely scanning for tasks when none can
   1.231 +     * be found immediately, and we cannot start/resume workers unless
   1.232 +     * there appear to be tasks available.  On the other hand, we must
   1.233 +     * quickly prod them into action when new tasks are submitted or
   1.234 +     * generated.  We park/unpark workers after placing in an event
   1.235 +     * wait queue when they cannot find work. This "queue" is actually
   1.236 +     * a simple Treiber stack, headed by the "id" field of ctl, plus a
   1.237 +     * 15bit counter value to both wake up waiters (by advancing their
   1.238 +     * count) and avoid ABA effects. Successors are held in worker
   1.239 +     * field "nextWait".  Queuing deals with several intrinsic races,
   1.240 +     * mainly that a task-producing thread can miss seeing (and
   1.241 +     * signalling) another thread that gave up looking for work but
   1.242 +     * has not yet entered the wait queue. We solve this by requiring
   1.243 +     * a full sweep of all workers both before (in scan()) and after
   1.244 +     * (in tryAwaitWork()) a newly waiting worker is added to the wait
   1.245 +     * queue. During a rescan, the worker might release some other
   1.246 +     * queued worker rather than itself, which has the same net
   1.247 +     * effect. Because enqueued workers may actually be rescanning
   1.248 +     * rather than waiting, we set and clear the "parked" field of
   1.249 +     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
   1.250 +     * (Use of the parked field requires a secondary recheck to avoid
   1.251 +     * missed signals.)
   1.252 +     *
   1.253 +     * Signalling.  We create or wake up workers only when there
   1.254 +     * appears to be at least one task they might be able to find and
   1.255 +     * execute.  When a submission is added or another worker adds a
   1.256 +     * task to a queue that previously had two or fewer tasks, they
   1.257 +     * signal waiting workers (or trigger creation of new ones if
   1.258 +     * fewer than the given parallelism level -- see signalWork).
   1.259 +     * These primary signals are buttressed by signals during rescans
   1.260 +     * as well as those performed when a worker steals a task and
   1.261 +     * notices that there are more tasks too; together these cover the
   1.262 +     * signals needed in cases when more than two tasks are pushed
   1.263 +     * but untaken.
   1.264 +     *
   1.265 +     * Trimming workers. To release resources after periods of lack of
   1.266 +     * use, a worker starting to wait when the pool is quiescent will
   1.267 +     * time out and terminate if the pool has remained quiescent for
   1.268 +     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
   1.269 +     * terminating all workers after long periods of non-use.
   1.270 +     *
   1.271 +     * Submissions. External submissions are maintained in an
   1.272 +     * array-based queue that is structured identically to
   1.273 +     * ForkJoinWorkerThread queues except for the use of
   1.274 +     * submissionLock in method addSubmission. Unlike the case for
   1.275 +     * worker queues, multiple external threads can add new
   1.276 +     * submissions, so adding requires a lock.
   1.277 +     *
   1.278 +     * Compensation. Beyond work-stealing support and lifecycle
   1.279 +     * control, the main responsibility of this framework is to take
   1.280 +     * actions when one worker is waiting to join a task stolen (or
   1.281 +     * always held by) another.  Because we are multiplexing many
   1.282 +     * tasks on to a pool of workers, we can't just let them block (as
   1.283 +     * in Thread.join).  We also cannot just reassign the joiner's
   1.284 +     * run-time stack with another and replace it later, which would
   1.285 +     * be a form of "continuation", that even if possible is not
   1.286 +     * necessarily a good idea since we sometimes need both an
   1.287 +     * unblocked task and its continuation to progress. Instead we
   1.288 +     * combine two tactics:
   1.289 +     *
   1.290 +     *   Helping: Arranging for the joiner to execute some task that it
   1.291 +     *      would be running if the steal had not occurred.  Method
   1.292 +     *      ForkJoinWorkerThread.joinTask tracks joining->stealing
   1.293 +     *      links to try to find such a task.
   1.294 +     *
   1.295 +     *   Compensating: Unless there are already enough live threads,
   1.296 +     *      method tryPreBlock() may create or re-activate a spare
   1.297 +     *      thread to compensate for blocked joiners until they
   1.298 +     *      unblock.
   1.299 +     *
   1.300 +     * The ManagedBlocker extension API can't use helping so relies
   1.301 +     * only on compensation in method awaitBlocker.
   1.302 +     *
   1.303 +     * It is impossible to keep exactly the target parallelism number
   1.304 +     * of threads running at any given time.  Determining the
   1.305 +     * existence of conservatively safe helping targets, the
   1.306 +     * availability of already-created spares, and the apparent need
   1.307 +     * to create new spares are all racy and require heuristic
   1.308 +     * guidance, so we rely on multiple retries of each.  Currently,
   1.309 +     * in keeping with on-demand signalling policy, we compensate only
   1.310 +     * if blocking would leave less than one active (non-waiting,
   1.311 +     * non-blocked) worker. Additionally, to avoid some false alarms
   1.312 +     * due to GC, lagging counters, system activity, etc, compensated
   1.313 +     * blocking for joins is only attempted after rechecks stabilize
   1.314 +     * (retries are interspersed with Thread.yield, for good
   1.315 +     * citizenship).  The variable blockedCount, incremented before
   1.316 +     * blocking and decremented after, is sometimes needed to
   1.317 +     * distinguish cases of waiting for work vs blocking on joins or
   1.318 +     * other managed sync. Both cases are equivalent for most pool
   1.319 +     * control, so we can update non-atomically. (Additionally,
   1.320 +     * contention on blockedCount alleviates some contention on ctl).
   1.321 +     *
   1.322 +     * Shutdown and Termination. A call to shutdownNow atomically sets
   1.323 +     * the ctl stop bit and then (non-atomically) sets each workers
   1.324 +     * "terminate" status, cancels all unprocessed tasks, and wakes up
   1.325 +     * all waiting workers.  Detecting whether termination should
   1.326 +     * commence after a non-abrupt shutdown() call requires more work
   1.327 +     * and bookkeeping. We need consensus about quiesence (i.e., that
   1.328 +     * there is no more work) which is reflected in active counts so
   1.329 +     * long as there are no current blockers, as well as possible
   1.330 +     * re-evaluations during independent changes in blocking or
   1.331 +     * quiescing workers.
   1.332 +     *
   1.333 +     * Style notes: There is a lot of representation-level coupling
   1.334 +     * among classes ForkJoinPool, ForkJoinWorkerThread, and
   1.335 +     * ForkJoinTask.  Most fields of ForkJoinWorkerThread maintain
   1.336 +     * data structures managed by ForkJoinPool, so are directly
   1.337 +     * accessed.  Conversely we allow access to "workers" array by
   1.338 +     * workers, and direct access to ForkJoinTask.status by both
   1.339 +     * ForkJoinPool and ForkJoinWorkerThread.  There is little point
   1.340 +     * trying to reduce this, since any associated future changes in
   1.341 +     * representations will need to be accompanied by algorithmic
   1.342 +     * changes anyway. All together, these low-level implementation
   1.343 +     * choices produce as much as a factor of 4 performance
   1.344 +     * improvement compared to naive implementations, and enable the
   1.345 +     * processing of billions of tasks per second, at the expense of
   1.346 +     * some ugliness.
   1.347 +     *
   1.348 +     * Methods signalWork() and scan() are the main bottlenecks so are
   1.349 +     * especially heavily micro-optimized/mangled.  There are lots of
   1.350 +     * inline assignments (of form "while ((local = field) != 0)")
   1.351 +     * which are usually the simplest way to ensure the required read
   1.352 +     * orderings (which are sometimes critical). This leads to a
   1.353 +     * "C"-like style of listing declarations of these locals at the
   1.354 +     * heads of methods or blocks.  There are several occurrences of
   1.355 +     * the unusual "do {} while (!cas...)"  which is the simplest way
   1.356 +     * to force an update of a CAS'ed variable. There are also other
   1.357 +     * coding oddities that help some methods perform reasonably even
   1.358 +     * when interpreted (not compiled).
   1.359 +     *
   1.360 +     * The order of declarations in this file is: (1) declarations of
   1.361 +     * statics (2) fields (along with constants used when unpacking
   1.362 +     * some of them), listed in an order that tends to reduce
   1.363 +     * contention among them a bit under most JVMs.  (3) internal
   1.364 +     * control methods (4) callbacks and other support for
   1.365 +     * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
   1.366 +     * methods (plus a few little helpers). (6) static block
   1.367 +     * initializing all statics in a minimally dependent order.
   1.368 +     */
   1.369 +
   1.370 +    /**
   1.371 +     * Factory for creating new {@link ForkJoinWorkerThread}s.
   1.372 +     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
   1.373 +     * for {@code ForkJoinWorkerThread} subclasses that extend base
   1.374 +     * functionality or initialize threads with different contexts.
   1.375 +     */
   1.376 +    public static interface ForkJoinWorkerThreadFactory {
   1.377 +        /**
   1.378 +         * Returns a new worker thread operating in the given pool.
   1.379 +         *
   1.380 +         * @param pool the pool this thread works in
   1.381 +         * @throws NullPointerException if the pool is null
   1.382 +         */
   1.383 +        public ForkJoinWorkerThread newThread(ForkJoinPool pool);
   1.384 +    }
   1.385 +
   1.386 +    /**
   1.387 +     * Default ForkJoinWorkerThreadFactory implementation; creates a
   1.388 +     * new ForkJoinWorkerThread.
   1.389 +     */
   1.390 +    static class DefaultForkJoinWorkerThreadFactory
   1.391 +        implements ForkJoinWorkerThreadFactory {
   1.392 +        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
   1.393 +            return new ForkJoinWorkerThread(pool);
   1.394 +        }
   1.395 +    }
   1.396 +
   1.397 +    /**
   1.398 +     * Creates a new ForkJoinWorkerThread. This factory is used unless
   1.399 +     * overridden in ForkJoinPool constructors.
   1.400 +     */
   1.401 +    public static final ForkJoinWorkerThreadFactory
   1.402 +        defaultForkJoinWorkerThreadFactory;
   1.403 +
   1.404 +    /**
   1.405 +     * Permission required for callers of methods that may start or
   1.406 +     * kill threads.
   1.407 +     */
   1.408 +    private static final RuntimePermission modifyThreadPermission;
   1.409 +
   1.410 +    /**
   1.411 +     * If there is a security manager, makes sure caller has
   1.412 +     * permission to modify threads.
   1.413 +     */
   1.414 +    private static void checkPermission() {
   1.415 +        SecurityManager security = System.getSecurityManager();
   1.416 +        if (security != null)
   1.417 +            security.checkPermission(modifyThreadPermission);
   1.418 +    }
   1.419 +
   1.420 +    /**
   1.421 +     * Generator for assigning sequence numbers as pool names.
   1.422 +     */
   1.423 +    private static final AtomicInteger poolNumberGenerator;
   1.424 +
   1.425 +    /**
   1.426 +     * Generator for initial random seeds for worker victim
   1.427 +     * selection. This is used only to create initial seeds. Random
   1.428 +     * steals use a cheaper xorshift generator per steal attempt. We
   1.429 +     * don't expect much contention on seedGenerator, so just use a
   1.430 +     * plain Random.
   1.431 +     */
   1.432 +    static final Random workerSeedGenerator;
   1.433 +
   1.434 +    /**
   1.435 +     * Array holding all worker threads in the pool.  Initialized upon
   1.436 +     * construction. Array size must be a power of two.  Updates and
   1.437 +     * replacements are protected by scanGuard, but the array is
   1.438 +     * always kept in a consistent enough state to be randomly
   1.439 +     * accessed without locking by workers performing work-stealing,
   1.440 +     * as well as other traversal-based methods in this class, so long
   1.441 +     * as reads memory-acquire by first reading ctl. All readers must
   1.442 +     * tolerate that some array slots may be null.
   1.443 +     */
   1.444 +    ForkJoinWorkerThread[] workers;
   1.445 +
   1.446 +    /**
   1.447 +     * Initial size for submission queue array. Must be a power of
   1.448 +     * two.  In many applications, these always stay small so we use a
   1.449 +     * small initial cap.
   1.450 +     */
   1.451 +    private static final int INITIAL_QUEUE_CAPACITY = 8;
   1.452 +
   1.453 +    /**
   1.454 +     * Maximum size for submission queue array. Must be a power of two
   1.455 +     * less than or equal to 1 << (31 - width of array entry) to
   1.456 +     * ensure lack of index wraparound, but is capped at a lower
   1.457 +     * value to help users trap runaway computations.
   1.458 +     */
   1.459 +    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
   1.460 +
   1.461 +    /**
   1.462 +     * Array serving as submission queue. Initialized upon construction.
   1.463 +     */
   1.464 +    private ForkJoinTask<?>[] submissionQueue;
   1.465 +
   1.466 +    /**
   1.467 +     * Lock protecting submissions array for addSubmission
   1.468 +     */
   1.469 +    private final ReentrantLock submissionLock;
   1.470 +
   1.471 +    /**
   1.472 +     * Condition for awaitTermination, using submissionLock for
   1.473 +     * convenience.
   1.474 +     */
   1.475 +    private final Condition termination;
   1.476 +
   1.477 +    /**
   1.478 +     * Creation factory for worker threads.
   1.479 +     */
   1.480 +    private final ForkJoinWorkerThreadFactory factory;
   1.481 +
   1.482 +    /**
   1.483 +     * The uncaught exception handler used when any worker abruptly
   1.484 +     * terminates.
   1.485 +     */
   1.486 +    final Thread.UncaughtExceptionHandler ueh;
   1.487 +
   1.488 +    /**
   1.489 +     * Prefix for assigning names to worker threads
   1.490 +     */
   1.491 +    private final String workerNamePrefix;
   1.492 +
   1.493 +    /**
   1.494 +     * Sum of per-thread steal counts, updated only when threads are
   1.495 +     * idle or terminating.
   1.496 +     */
   1.497 +    private volatile long stealCount;
   1.498 +
   1.499 +    /**
   1.500 +     * Main pool control -- a long packed with:
   1.501 +     * AC: Number of active running workers minus target parallelism (16 bits)
   1.502 +     * TC: Number of total workers minus target parallelism (16bits)
   1.503 +     * ST: true if pool is terminating (1 bit)
   1.504 +     * EC: the wait count of top waiting thread (15 bits)
   1.505 +     * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
   1.506 +     *
   1.507 +     * When convenient, we can extract the upper 32 bits of counts and
   1.508 +     * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
   1.509 +     * (int)ctl.  The ec field is never accessed alone, but always
   1.510 +     * together with id and st. The offsets of counts by the target
   1.511 +     * parallelism and the positionings of fields makes it possible to
   1.512 +     * perform the most common checks via sign tests of fields: When
   1.513 +     * ac is negative, there are not enough active workers, when tc is
   1.514 +     * negative, there are not enough total workers, when id is
   1.515 +     * negative, there is at least one waiting worker, and when e is
   1.516 +     * negative, the pool is terminating.  To deal with these possibly
   1.517 +     * negative fields, we use casts in and out of "short" and/or
   1.518 +     * signed shifts to maintain signedness.
   1.519 +     */
   1.520 +    volatile long ctl;
   1.521 +
   1.522 +    // bit positions/shifts for fields
   1.523 +    private static final int  AC_SHIFT   = 48;
   1.524 +    private static final int  TC_SHIFT   = 32;
   1.525 +    private static final int  ST_SHIFT   = 31;
   1.526 +    private static final int  EC_SHIFT   = 16;
   1.527 +
   1.528 +    // bounds
   1.529 +    private static final int  MAX_ID     = 0x7fff;  // max poolIndex
   1.530 +    private static final int  SMASK      = 0xffff;  // mask short bits
   1.531 +    private static final int  SHORT_SIGN = 1 << 15;
   1.532 +    private static final int  INT_SIGN   = 1 << 31;
   1.533 +
   1.534 +    // masks
   1.535 +    private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
   1.536 +    private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
   1.537 +    private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
   1.538 +
   1.539 +    // units for incrementing and decrementing
   1.540 +    private static final long TC_UNIT    = 1L << TC_SHIFT;
   1.541 +    private static final long AC_UNIT    = 1L << AC_SHIFT;
   1.542 +
   1.543 +    // masks and units for dealing with u = (int)(ctl >>> 32)
   1.544 +    private static final int  UAC_SHIFT  = AC_SHIFT - 32;
   1.545 +    private static final int  UTC_SHIFT  = TC_SHIFT - 32;
   1.546 +    private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
   1.547 +    private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
   1.548 +    private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
   1.549 +    private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
   1.550 +
   1.551 +    // masks and units for dealing with e = (int)ctl
   1.552 +    private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
   1.553 +    private static final int  EC_UNIT    = 1 << EC_SHIFT;
   1.554 +
   1.555 +    /**
   1.556 +     * The target parallelism level.
   1.557 +     */
   1.558 +    final int parallelism;
   1.559 +
   1.560 +    /**
   1.561 +     * Index (mod submission queue length) of next element to take
   1.562 +     * from submission queue. Usage is identical to that for
   1.563 +     * per-worker queues -- see ForkJoinWorkerThread internal
   1.564 +     * documentation.
   1.565 +     */
   1.566 +    volatile int queueBase;
   1.567 +
   1.568 +    /**
   1.569 +     * Index (mod submission queue length) of next element to add
   1.570 +     * in submission queue. Usage is identical to that for
   1.571 +     * per-worker queues -- see ForkJoinWorkerThread internal
   1.572 +     * documentation.
   1.573 +     */
   1.574 +    int queueTop;
   1.575 +
   1.576 +    /**
   1.577 +     * True when shutdown() has been called.
   1.578 +     */
   1.579 +    volatile boolean shutdown;
   1.580 +
   1.581 +    /**
   1.582 +     * True if use local fifo, not default lifo, for local polling
   1.583 +     * Read by, and replicated by ForkJoinWorkerThreads
   1.584 +     */
   1.585 +    final boolean locallyFifo;
   1.586 +
   1.587 +    /**
   1.588 +     * The number of threads in ForkJoinWorkerThreads.helpQuiescePool.
   1.589 +     * When non-zero, suppresses automatic shutdown when active
   1.590 +     * counts become zero.
   1.591 +     */
   1.592 +    volatile int quiescerCount;
   1.593 +
   1.594 +    /**
   1.595 +     * The number of threads blocked in join.
   1.596 +     */
   1.597 +    volatile int blockedCount;
   1.598 +
   1.599 +    /**
   1.600 +     * Counter for worker Thread names (unrelated to their poolIndex)
   1.601 +     */
   1.602 +    private volatile int nextWorkerNumber;
   1.603 +
   1.604 +    /**
   1.605 +     * The index for the next created worker. Accessed under scanGuard.
   1.606 +     */
   1.607 +    private int nextWorkerIndex;
   1.608 +
   1.609 +    /**
   1.610 +     * SeqLock and index masking for updates to workers array.  Locked
   1.611 +     * when SG_UNIT is set. Unlocking clears bit by adding
   1.612 +     * SG_UNIT. Staleness of read-only operations can be checked by
   1.613 +     * comparing scanGuard to value before the reads. The low 16 bits
   1.614 +     * (i.e, anding with SMASK) hold (the smallest power of two
   1.615 +     * covering all worker indices, minus one, and is used to avoid
   1.616 +     * dealing with large numbers of null slots when the workers array
   1.617 +     * is overallocated.
   1.618 +     */
   1.619 +    volatile int scanGuard;
   1.620 +
   1.621 +    private static final int SG_UNIT = 1 << 16;
   1.622 +
   1.623 +    /**
   1.624 +     * The wakeup interval (in nanoseconds) for a worker waiting for a
   1.625 +     * task when the pool is quiescent to instead try to shrink the
   1.626 +     * number of workers.  The exact value does not matter too
   1.627 +     * much. It must be short enough to release resources during
   1.628 +     * sustained periods of idleness, but not so short that threads
   1.629 +     * are continually re-created.
   1.630 +     */
   1.631 +    private static final long SHRINK_RATE =
   1.632 +        4L * 1000L * 1000L * 1000L; // 4 seconds
   1.633 +
   1.634 +    /**
   1.635 +     * Top-level loop for worker threads: On each step: if the
   1.636 +     * previous step swept through all queues and found no tasks, or
   1.637 +     * there are excess threads, then possibly blocks. Otherwise,
   1.638 +     * scans for and, if found, executes a task. Returns when pool
   1.639 +     * and/or worker terminate.
   1.640 +     *
   1.641 +     * @param w the worker
   1.642 +     */
   1.643 +    final void work(ForkJoinWorkerThread w) {
   1.644 +        boolean swept = false;                // true on empty scans
   1.645 +        long c;
   1.646 +        while (!w.terminate && (int)(c = ctl) >= 0) {
   1.647 +            int a;                            // active count
   1.648 +            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
   1.649 +                swept = scan(w, a);
   1.650 +            else if (tryAwaitWork(w, c))
   1.651 +                swept = false;
   1.652 +        }
   1.653 +    }
   1.654 +
   1.655 +    // Signalling
   1.656 +
   1.657 +    /**
   1.658 +     * Wakes up or creates a worker.
   1.659 +     */
   1.660 +    final void signalWork() {
   1.661 +        /*
   1.662 +         * The while condition is true if: (there is are too few total
   1.663 +         * workers OR there is at least one waiter) AND (there are too
   1.664 +         * few active workers OR the pool is terminating).  The value
   1.665 +         * of e distinguishes the remaining cases: zero (no waiters)
   1.666 +         * for create, negative if terminating (in which case do
   1.667 +         * nothing), else release a waiter. The secondary checks for
   1.668 +         * release (non-null array etc) can fail if the pool begins
   1.669 +         * terminating after the test, and don't impose any added cost
   1.670 +         * because JVMs must perform null and bounds checks anyway.
   1.671 +         */
   1.672 +        long c; int e, u;
   1.673 +        while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
   1.674 +                (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
   1.675 +            if (e > 0) {                         // release a waiting worker
   1.676 +                int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
   1.677 +                if ((ws = workers) == null ||
   1.678 +                    (i = ~e & SMASK) >= ws.length ||
   1.679 +                    (w = ws[i]) == null)
   1.680 +                    break;
   1.681 +                long nc = (((long)(w.nextWait & E_MASK)) |
   1.682 +                           ((long)(u + UAC_UNIT) << 32));
   1.683 +                if (w.eventCount == e &&
   1.684 +                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   1.685 +                    w.eventCount = (e + EC_UNIT) & E_MASK;
   1.686 +                    if (w.parked)
   1.687 +                        UNSAFE.unpark(w);
   1.688 +                    break;
   1.689 +                }
   1.690 +            }
   1.691 +            else if (UNSAFE.compareAndSwapLong
   1.692 +                     (this, ctlOffset, c,
   1.693 +                      (long)(((u + UTC_UNIT) & UTC_MASK) |
   1.694 +                             ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
   1.695 +                addWorker();
   1.696 +                break;
   1.697 +            }
   1.698 +        }
   1.699 +    }
   1.700 +
   1.701 +    /**
   1.702 +     * Variant of signalWork to help release waiters on rescans.
   1.703 +     * Tries once to release a waiter if active count < 0.
   1.704 +     *
   1.705 +     * @return false if failed due to contention, else true
   1.706 +     */
   1.707 +    private boolean tryReleaseWaiter() {
   1.708 +        long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
   1.709 +        if ((e = (int)(c = ctl)) > 0 &&
   1.710 +            (int)(c >> AC_SHIFT) < 0 &&
   1.711 +            (ws = workers) != null &&
   1.712 +            (i = ~e & SMASK) < ws.length &&
   1.713 +            (w = ws[i]) != null) {
   1.714 +            long nc = ((long)(w.nextWait & E_MASK) |
   1.715 +                       ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
   1.716 +            if (w.eventCount != e ||
   1.717 +                !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
   1.718 +                return false;
   1.719 +            w.eventCount = (e + EC_UNIT) & E_MASK;
   1.720 +            if (w.parked)
   1.721 +                UNSAFE.unpark(w);
   1.722 +        }
   1.723 +        return true;
   1.724 +    }
   1.725 +
   1.726 +    // Scanning for tasks
   1.727 +
   1.728 +    /**
   1.729 +     * Scans for and, if found, executes one task. Scans start at a
   1.730 +     * random index of workers array, and randomly select the first
   1.731 +     * (2*#workers)-1 probes, and then, if all empty, resort to 2
   1.732 +     * circular sweeps, which is necessary to check quiescence. and
   1.733 +     * taking a submission only if no stealable tasks were found.  The
   1.734 +     * steal code inside the loop is a specialized form of
   1.735 +     * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
   1.736 +     * helpJoinTask and signal propagation. The code for submission
   1.737 +     * queues is almost identical. On each steal, the worker completes
   1.738 +     * not only the task, but also all local tasks that this task may
   1.739 +     * have generated. On detecting staleness or contention when
   1.740 +     * trying to take a task, this method returns without finishing
   1.741 +     * sweep, which allows global state rechecks before retry.
   1.742 +     *
   1.743 +     * @param w the worker
   1.744 +     * @param a the number of active workers
   1.745 +     * @return true if swept all queues without finding a task
   1.746 +     */
   1.747 +    private boolean scan(ForkJoinWorkerThread w, int a) {
   1.748 +        int g = scanGuard; // mask 0 avoids useless scans if only one active
   1.749 +        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
   1.750 +        ForkJoinWorkerThread[] ws = workers;
   1.751 +        if (ws == null || ws.length <= m)         // staleness check
   1.752 +            return false;
   1.753 +        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
   1.754 +            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   1.755 +            ForkJoinWorkerThread v = ws[k & m];
   1.756 +            if (v != null && (b = v.queueBase) != v.queueTop &&
   1.757 +                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
   1.758 +                long u = (i << ASHIFT) + ABASE;
   1.759 +                if ((t = q[i]) != null && v.queueBase == b &&
   1.760 +                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
   1.761 +                    int d = (v.queueBase = b + 1) - v.queueTop;
   1.762 +                    v.stealHint = w.poolIndex;
   1.763 +                    if (d != 0)
   1.764 +                        signalWork();             // propagate if nonempty
   1.765 +                    w.execTask(t);
   1.766 +                }
   1.767 +                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
   1.768 +                return false;                     // store next seed
   1.769 +            }
   1.770 +            else if (j < 0) {                     // xorshift
   1.771 +                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
   1.772 +            }
   1.773 +            else
   1.774 +                ++k;
   1.775 +        }
   1.776 +        if (scanGuard != g)                       // staleness check
   1.777 +            return false;
   1.778 +        else {                                    // try to take submission
   1.779 +            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   1.780 +            if ((b = queueBase) != queueTop &&
   1.781 +                (q = submissionQueue) != null &&
   1.782 +                (i = (q.length - 1) & b) >= 0) {
   1.783 +                long u = (i << ASHIFT) + ABASE;
   1.784 +                if ((t = q[i]) != null && queueBase == b &&
   1.785 +                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
   1.786 +                    queueBase = b + 1;
   1.787 +                    w.execTask(t);
   1.788 +                }
   1.789 +                return false;
   1.790 +            }
   1.791 +            return true;                         // all queues empty
   1.792 +        }
   1.793 +    }
   1.794 +
   1.795 +    /**
   1.796 +     * Tries to enqueue worker w in wait queue and await change in
   1.797 +     * worker's eventCount.  If the pool is quiescent and there is
   1.798 +     * more than one worker, possibly terminates worker upon exit.
   1.799 +     * Otherwise, before blocking, rescans queues to avoid missed
   1.800 +     * signals.  Upon finding work, releases at least one worker
   1.801 +     * (which may be the current worker). Rescans restart upon
   1.802 +     * detected staleness or failure to release due to
   1.803 +     * contention. Note the unusual conventions about Thread.interrupt
   1.804 +     * here and elsewhere: Because interrupts are used solely to alert
   1.805 +     * threads to check termination, which is checked here anyway, we
   1.806 +     * clear status (using Thread.interrupted) before any call to
   1.807 +     * park, so that park does not immediately return due to status
   1.808 +     * being set via some other unrelated call to interrupt in user
   1.809 +     * code.
   1.810 +     *
   1.811 +     * @param w the calling worker
   1.812 +     * @param c the ctl value on entry
   1.813 +     * @return true if waited or another thread was released upon enq
   1.814 +     */
   1.815 +    private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
   1.816 +        int v = w.eventCount;
   1.817 +        w.nextWait = (int)c;                      // w's successor record
   1.818 +        long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   1.819 +        if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   1.820 +            long d = ctl; // return true if lost to a deq, to force scan
   1.821 +            return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
   1.822 +        }
   1.823 +        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
   1.824 +            long s = stealCount;
   1.825 +            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
   1.826 +                sc = w.stealCount = 0;
   1.827 +            else if (w.eventCount != v)
   1.828 +                return true;                      // update next time
   1.829 +        }
   1.830 +        if ((!shutdown || !tryTerminate(false)) &&
   1.831 +            (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
   1.832 +            blockedCount == 0 && quiescerCount == 0)
   1.833 +            idleAwaitWork(w, nc, c, v);           // quiescent
   1.834 +        for (boolean rescanned = false;;) {
   1.835 +            if (w.eventCount != v)
   1.836 +                return true;
   1.837 +            if (!rescanned) {
   1.838 +                int g = scanGuard, m = g & SMASK;
   1.839 +                ForkJoinWorkerThread[] ws = workers;
   1.840 +                if (ws != null && m < ws.length) {
   1.841 +                    rescanned = true;
   1.842 +                    for (int i = 0; i <= m; ++i) {
   1.843 +                        ForkJoinWorkerThread u = ws[i];
   1.844 +                        if (u != null) {
   1.845 +                            if (u.queueBase != u.queueTop &&
   1.846 +                                !tryReleaseWaiter())
   1.847 +                                rescanned = false; // contended
   1.848 +                            if (w.eventCount != v)
   1.849 +                                return true;
   1.850 +                        }
   1.851 +                    }
   1.852 +                }
   1.853 +                if (scanGuard != g ||              // stale
   1.854 +                    (queueBase != queueTop && !tryReleaseWaiter()))
   1.855 +                    rescanned = false;
   1.856 +                if (!rescanned)
   1.857 +                    Thread.yield();                // reduce contention
   1.858 +                else
   1.859 +                    Thread.interrupted();          // clear before park
   1.860 +            }
   1.861 +            else {
   1.862 +                w.parked = true;                   // must recheck
   1.863 +                if (w.eventCount != v) {
   1.864 +                    w.parked = false;
   1.865 +                    return true;
   1.866 +                }
   1.867 +                LockSupport.park(this);
   1.868 +                rescanned = w.parked = false;
   1.869 +            }
   1.870 +        }
   1.871 +    }
   1.872 +
   1.873 +    /**
   1.874 +     * If inactivating worker w has caused pool to become
   1.875 +     * quiescent, check for pool termination, and wait for event
   1.876 +     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
   1.877 +     * this case because quiescence reflects consensus about lack
   1.878 +     * of work). On timeout, if ctl has not changed, terminate the
   1.879 +     * worker. Upon its termination (see deregisterWorker), it may
   1.880 +     * wake up another worker to possibly repeat this process.
   1.881 +     *
   1.882 +     * @param w the calling worker
   1.883 +     * @param currentCtl the ctl value after enqueuing w
   1.884 +     * @param prevCtl the ctl value if w terminated
   1.885 +     * @param v the eventCount w awaits change
   1.886 +     */
   1.887 +    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
   1.888 +                               long prevCtl, int v) {
   1.889 +        if (w.eventCount == v) {
   1.890 +            if (shutdown)
   1.891 +                tryTerminate(false);
   1.892 +            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
   1.893 +            while (ctl == currentCtl) {
   1.894 +                long startTime = System.nanoTime();
   1.895 +                w.parked = true;
   1.896 +                if (w.eventCount == v)             // must recheck
   1.897 +                    LockSupport.parkNanos(this, SHRINK_RATE);
   1.898 +                w.parked = false;
   1.899 +                if (w.eventCount != v)
   1.900 +                    break;
   1.901 +                else if (System.nanoTime() - startTime <
   1.902 +                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
   1.903 +                    Thread.interrupted();          // spurious wakeup
   1.904 +                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
   1.905 +                                                   currentCtl, prevCtl)) {
   1.906 +                    w.terminate = true;            // restore previous
   1.907 +                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
   1.908 +                    break;
   1.909 +                }
   1.910 +            }
   1.911 +        }
   1.912 +    }
   1.913 +
   1.914 +    // Submissions
   1.915 +
   1.916 +    /**
   1.917 +     * Enqueues the given task in the submissionQueue.  Same idea as
   1.918 +     * ForkJoinWorkerThread.pushTask except for use of submissionLock.
   1.919 +     *
   1.920 +     * @param t the task
   1.921 +     */
   1.922 +    private void addSubmission(ForkJoinTask<?> t) {
   1.923 +        final ReentrantLock lock = this.submissionLock;
   1.924 +        lock.lock();
   1.925 +        try {
   1.926 +            ForkJoinTask<?>[] q; int s, m;
   1.927 +            if ((q = submissionQueue) != null) {    // ignore if queue removed
   1.928 +                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
   1.929 +                UNSAFE.putOrderedObject(q, u, t);
   1.930 +                queueTop = s + 1;
   1.931 +                if (s - queueBase == m)
   1.932 +                    growSubmissionQueue();
   1.933 +            }
   1.934 +        } finally {
   1.935 +            lock.unlock();
   1.936 +        }
   1.937 +        signalWork();
   1.938 +    }
   1.939 +
   1.940 +    //  (pollSubmission is defined below with exported methods)
   1.941 +
   1.942 +    /**
   1.943 +     * Creates or doubles submissionQueue array.
   1.944 +     * Basically identical to ForkJoinWorkerThread version.
   1.945 +     */
   1.946 +    private void growSubmissionQueue() {
   1.947 +        ForkJoinTask<?>[] oldQ = submissionQueue;
   1.948 +        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
   1.949 +        if (size > MAXIMUM_QUEUE_CAPACITY)
   1.950 +            throw new RejectedExecutionException("Queue capacity exceeded");
   1.951 +        if (size < INITIAL_QUEUE_CAPACITY)
   1.952 +            size = INITIAL_QUEUE_CAPACITY;
   1.953 +        ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
   1.954 +        int mask = size - 1;
   1.955 +        int top = queueTop;
   1.956 +        int oldMask;
   1.957 +        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
   1.958 +            for (int b = queueBase; b != top; ++b) {
   1.959 +                long u = ((b & oldMask) << ASHIFT) + ABASE;
   1.960 +                Object x = UNSAFE.getObjectVolatile(oldQ, u);
   1.961 +                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
   1.962 +                    UNSAFE.putObjectVolatile
   1.963 +                        (q, ((b & mask) << ASHIFT) + ABASE, x);
   1.964 +            }
   1.965 +        }
   1.966 +    }
   1.967 +
   1.968 +    // Blocking support
   1.969 +
   1.970 +    /**
   1.971 +     * Tries to increment blockedCount, decrement active count
   1.972 +     * (sometimes implicitly) and possibly release or create a
   1.973 +     * compensating worker in preparation for blocking. Fails
   1.974 +     * on contention or termination.
   1.975 +     *
   1.976 +     * @return true if the caller can block, else should recheck and retry
   1.977 +     */
   1.978 +    private boolean tryPreBlock() {
   1.979 +        int b = blockedCount;
   1.980 +        if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
   1.981 +            int pc = parallelism;
   1.982 +            do {
   1.983 +                ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
   1.984 +                int e, ac, tc, rc, i;
   1.985 +                long c = ctl;
   1.986 +                int u = (int)(c >>> 32);
   1.987 +                if ((e = (int)c) < 0) {
   1.988 +                                                 // skip -- terminating
   1.989 +                }
   1.990 +                else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
   1.991 +                         (ws = workers) != null &&
   1.992 +                         (i = ~e & SMASK) < ws.length &&
   1.993 +                         (w = ws[i]) != null) {
   1.994 +                    long nc = ((long)(w.nextWait & E_MASK) |
   1.995 +                               (c & (AC_MASK|TC_MASK)));
   1.996 +                    if (w.eventCount == e &&
   1.997 +                        UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
   1.998 +                        w.eventCount = (e + EC_UNIT) & E_MASK;
   1.999 +                        if (w.parked)
  1.1000 +                            UNSAFE.unpark(w);
  1.1001 +                        return true;             // release an idle worker
  1.1002 +                    }
  1.1003 +                }
  1.1004 +                else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
  1.1005 +                    long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
  1.1006 +                    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
  1.1007 +                        return true;             // no compensation needed
  1.1008 +                }
  1.1009 +                else if (tc + pc < MAX_ID) {
  1.1010 +                    long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
  1.1011 +                    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
  1.1012 +                        addWorker();
  1.1013 +                        return true;            // create a replacement
  1.1014 +                    }
  1.1015 +                }
  1.1016 +                // try to back out on any failure and let caller retry
  1.1017 +            } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
  1.1018 +                                               b = blockedCount, b - 1));
  1.1019 +        }
  1.1020 +        return false;
  1.1021 +    }
  1.1022 +
  1.1023 +    /**
  1.1024 +     * Decrements blockedCount and increments active count
  1.1025 +     */
  1.1026 +    private void postBlock() {
  1.1027 +        long c;
  1.1028 +        do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
  1.1029 +                                                c = ctl, c + AC_UNIT));
  1.1030 +        int b;
  1.1031 +        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
  1.1032 +                                               b = blockedCount, b - 1));
  1.1033 +    }
  1.1034 +
  1.1035 +    /**
  1.1036 +     * Possibly blocks waiting for the given task to complete, or
  1.1037 +     * cancels the task if terminating.  Fails to wait if contended.
  1.1038 +     *
  1.1039 +     * @param joinMe the task
  1.1040 +     */
  1.1041 +    final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
  1.1042 +        int s;
  1.1043 +        Thread.interrupted(); // clear interrupts before checking termination
  1.1044 +        if (joinMe.status >= 0) {
  1.1045 +            if (tryPreBlock()) {
  1.1046 +                joinMe.tryAwaitDone(0L);
  1.1047 +                postBlock();
  1.1048 +            }
  1.1049 +            else if ((ctl & STOP_BIT) != 0L)
  1.1050 +                joinMe.cancelIgnoringExceptions();
  1.1051 +        }
  1.1052 +    }
  1.1053 +
  1.1054 +    /**
  1.1055 +     * Possibly blocks the given worker waiting for joinMe to
  1.1056 +     * complete or timeout
  1.1057 +     *
  1.1058 +     * @param joinMe the task
  1.1059 +     * @param millis the wait time for underlying Object.wait
  1.1060 +     */
  1.1061 +    final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
  1.1062 +        while (joinMe.status >= 0) {
  1.1063 +            Thread.interrupted();
  1.1064 +            if ((ctl & STOP_BIT) != 0L) {
  1.1065 +                joinMe.cancelIgnoringExceptions();
  1.1066 +                break;
  1.1067 +            }
  1.1068 +            if (tryPreBlock()) {
  1.1069 +                long last = System.nanoTime();
  1.1070 +                while (joinMe.status >= 0) {
  1.1071 +                    long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
  1.1072 +                    if (millis <= 0)
  1.1073 +                        break;
  1.1074 +                    joinMe.tryAwaitDone(millis);
  1.1075 +                    if (joinMe.status < 0)
  1.1076 +                        break;
  1.1077 +                    if ((ctl & STOP_BIT) != 0L) {
  1.1078 +                        joinMe.cancelIgnoringExceptions();
  1.1079 +                        break;
  1.1080 +                    }
  1.1081 +                    long now = System.nanoTime();
  1.1082 +                    nanos -= now - last;
  1.1083 +                    last = now;
  1.1084 +                }
  1.1085 +                postBlock();
  1.1086 +                break;
  1.1087 +            }
  1.1088 +        }
  1.1089 +    }
  1.1090 +
  1.1091 +    /**
  1.1092 +     * If necessary, compensates for blocker, and blocks
  1.1093 +     */
  1.1094 +    private void awaitBlocker(ManagedBlocker blocker)
  1.1095 +        throws InterruptedException {
  1.1096 +        while (!blocker.isReleasable()) {
  1.1097 +            if (tryPreBlock()) {
  1.1098 +                try {
  1.1099 +                    do {} while (!blocker.isReleasable() && !blocker.block());
  1.1100 +                } finally {
  1.1101 +                    postBlock();
  1.1102 +                }
  1.1103 +                break;
  1.1104 +            }
  1.1105 +        }
  1.1106 +    }
  1.1107 +
  1.1108 +    // Creating, registering and deregistring workers
  1.1109 +
  1.1110 +    /**
  1.1111 +     * Tries to create and start a worker; minimally rolls back counts
  1.1112 +     * on failure.
  1.1113 +     */
  1.1114 +    private void addWorker() {
  1.1115 +        Throwable ex = null;
  1.1116 +        ForkJoinWorkerThread t = null;
  1.1117 +        try {
  1.1118 +            t = factory.newThread(this);
  1.1119 +        } catch (Throwable e) {
  1.1120 +            ex = e;
  1.1121 +        }
  1.1122 +        if (t == null) {  // null or exceptional factory return
  1.1123 +            long c;       // adjust counts
  1.1124 +            do {} while (!UNSAFE.compareAndSwapLong
  1.1125 +                         (this, ctlOffset, c = ctl,
  1.1126 +                          (((c - AC_UNIT) & AC_MASK) |
  1.1127 +                           ((c - TC_UNIT) & TC_MASK) |
  1.1128 +                           (c & ~(AC_MASK|TC_MASK)))));
  1.1129 +            // Propagate exception if originating from an external caller
  1.1130 +            if (!tryTerminate(false) && ex != null &&
  1.1131 +                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
  1.1132 +                UNSAFE.throwException(ex);
  1.1133 +        }
  1.1134 +        else
  1.1135 +            t.start();
  1.1136 +    }
  1.1137 +
  1.1138 +    /**
  1.1139 +     * Callback from ForkJoinWorkerThread constructor to assign a
  1.1140 +     * public name
  1.1141 +     */
  1.1142 +    final String nextWorkerName() {
  1.1143 +        for (int n;;) {
  1.1144 +            if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
  1.1145 +                                         n = nextWorkerNumber, ++n))
  1.1146 +                return workerNamePrefix + n;
  1.1147 +        }
  1.1148 +    }
  1.1149 +
  1.1150 +    /**
  1.1151 +     * Callback from ForkJoinWorkerThread constructor to
  1.1152 +     * determine its poolIndex and record in workers array.
  1.1153 +     *
  1.1154 +     * @param w the worker
  1.1155 +     * @return the worker's pool index
  1.1156 +     */
  1.1157 +    final int registerWorker(ForkJoinWorkerThread w) {
  1.1158 +        /*
  1.1159 +         * In the typical case, a new worker acquires the lock, uses
  1.1160 +         * next available index and returns quickly.  Since we should
  1.1161 +         * not block callers (ultimately from signalWork or
  1.1162 +         * tryPreBlock) waiting for the lock needed to do this, we
  1.1163 +         * instead help release other workers while waiting for the
  1.1164 +         * lock.
  1.1165 +         */
  1.1166 +        for (int g;;) {
  1.1167 +            ForkJoinWorkerThread[] ws;
  1.1168 +            if (((g = scanGuard) & SG_UNIT) == 0 &&
  1.1169 +                UNSAFE.compareAndSwapInt(this, scanGuardOffset,
  1.1170 +                                         g, g | SG_UNIT)) {
  1.1171 +                int k = nextWorkerIndex;
  1.1172 +                try {
  1.1173 +                    if ((ws = workers) != null) { // ignore on shutdown
  1.1174 +                        int n = ws.length;
  1.1175 +                        if (k < 0 || k >= n || ws[k] != null) {
  1.1176 +                            for (k = 0; k < n && ws[k] != null; ++k)
  1.1177 +                                ;
  1.1178 +                            if (k == n)
  1.1179 +                                ws = workers = Arrays.copyOf(ws, n << 1);
  1.1180 +                        }
  1.1181 +                        ws[k] = w;
  1.1182 +                        nextWorkerIndex = k + 1;
  1.1183 +                        int m = g & SMASK;
  1.1184 +                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
  1.1185 +                    }
  1.1186 +                } finally {
  1.1187 +                    scanGuard = g;
  1.1188 +                }
  1.1189 +                return k;
  1.1190 +            }
  1.1191 +            else if ((ws = workers) != null) { // help release others
  1.1192 +                for (ForkJoinWorkerThread u : ws) {
  1.1193 +                    if (u != null && u.queueBase != u.queueTop) {
  1.1194 +                        if (tryReleaseWaiter())
  1.1195 +                            break;
  1.1196 +                    }
  1.1197 +                }
  1.1198 +            }
  1.1199 +        }
  1.1200 +    }
  1.1201 +
  1.1202 +    /**
  1.1203 +     * Final callback from terminating worker.  Removes record of
  1.1204 +     * worker from array, and adjusts counts. If pool is shutting
  1.1205 +     * down, tries to complete termination.
  1.1206 +     *
  1.1207 +     * @param w the worker
  1.1208 +     */
  1.1209 +    final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
  1.1210 +        int idx = w.poolIndex;
  1.1211 +        int sc = w.stealCount;
  1.1212 +        int steps = 0;
  1.1213 +        // Remove from array, adjust worker counts and collect steal count.
  1.1214 +        // We can intermix failed removes or adjusts with steal updates
  1.1215 +        do {
  1.1216 +            long s, c;
  1.1217 +            int g;
  1.1218 +            if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
  1.1219 +                UNSAFE.compareAndSwapInt(this, scanGuardOffset,
  1.1220 +                                         g, g |= SG_UNIT)) {
  1.1221 +                ForkJoinWorkerThread[] ws = workers;
  1.1222 +                if (ws != null && idx >= 0 &&
  1.1223 +                    idx < ws.length && ws[idx] == w)
  1.1224 +                    ws[idx] = null;    // verify
  1.1225 +                nextWorkerIndex = idx;
  1.1226 +                scanGuard = g + SG_UNIT;
  1.1227 +                steps = 1;
  1.1228 +            }
  1.1229 +            if (steps == 1 &&
  1.1230 +                UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
  1.1231 +                                          (((c - AC_UNIT) & AC_MASK) |
  1.1232 +                                           ((c - TC_UNIT) & TC_MASK) |
  1.1233 +                                           (c & ~(AC_MASK|TC_MASK)))))
  1.1234 +                steps = 2;
  1.1235 +            if (sc != 0 &&
  1.1236 +                UNSAFE.compareAndSwapLong(this, stealCountOffset,
  1.1237 +                                          s = stealCount, s + sc))
  1.1238 +                sc = 0;
  1.1239 +        } while (steps != 2 || sc != 0);
  1.1240 +        if (!tryTerminate(false)) {
  1.1241 +            if (ex != null)   // possibly replace if died abnormally
  1.1242 +                signalWork();
  1.1243 +            else
  1.1244 +                tryReleaseWaiter();
  1.1245 +        }
  1.1246 +    }
  1.1247 +
  1.1248 +    // Shutdown and termination
  1.1249 +
  1.1250 +    /**
  1.1251 +     * Possibly initiates and/or completes termination.
  1.1252 +     *
  1.1253 +     * @param now if true, unconditionally terminate, else only
  1.1254 +     * if shutdown and empty queue and no active workers
  1.1255 +     * @return true if now terminating or terminated
  1.1256 +     */
  1.1257 +    private boolean tryTerminate(boolean now) {
  1.1258 +        long c;
  1.1259 +        while (((c = ctl) & STOP_BIT) == 0) {
  1.1260 +            if (!now) {
  1.1261 +                if ((int)(c >> AC_SHIFT) != -parallelism)
  1.1262 +                    return false;
  1.1263 +                if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
  1.1264 +                    queueBase != queueTop) {
  1.1265 +                    if (ctl == c) // staleness check
  1.1266 +                        return false;
  1.1267 +                    continue;
  1.1268 +                }
  1.1269 +            }
  1.1270 +            if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
  1.1271 +                startTerminating();
  1.1272 +        }
  1.1273 +        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
  1.1274 +            final ReentrantLock lock = this.submissionLock;
  1.1275 +            lock.lock();
  1.1276 +            try {
  1.1277 +                termination.signalAll();
  1.1278 +            } finally {
  1.1279 +                lock.unlock();
  1.1280 +            }
  1.1281 +        }
  1.1282 +        return true;
  1.1283 +    }
  1.1284 +
  1.1285 +    /**
  1.1286 +     * Runs up to three passes through workers: (0) Setting
  1.1287 +     * termination status for each worker, followed by wakeups up to
  1.1288 +     * queued workers; (1) helping cancel tasks; (2) interrupting
  1.1289 +     * lagging threads (likely in external tasks, but possibly also
  1.1290 +     * blocked in joins).  Each pass repeats previous steps because of
  1.1291 +     * potential lagging thread creation.
  1.1292 +     */
  1.1293 +    private void startTerminating() {
  1.1294 +        cancelSubmissions();
  1.1295 +        for (int pass = 0; pass < 3; ++pass) {
  1.1296 +            ForkJoinWorkerThread[] ws = workers;
  1.1297 +            if (ws != null) {
  1.1298 +                for (ForkJoinWorkerThread w : ws) {
  1.1299 +                    if (w != null) {
  1.1300 +                        w.terminate = true;
  1.1301 +                        if (pass > 0) {
  1.1302 +                            w.cancelTasks();
  1.1303 +                            if (pass > 1 && !w.isInterrupted()) {
  1.1304 +                                try {
  1.1305 +                                    w.interrupt();
  1.1306 +                                } catch (SecurityException ignore) {
  1.1307 +                                }
  1.1308 +                            }
  1.1309 +                        }
  1.1310 +                    }
  1.1311 +                }
  1.1312 +                terminateWaiters();
  1.1313 +            }
  1.1314 +        }
  1.1315 +    }
  1.1316 +
  1.1317 +    /**
  1.1318 +     * Polls and cancels all submissions. Called only during termination.
  1.1319 +     */
  1.1320 +    private void cancelSubmissions() {
  1.1321 +        while (queueBase != queueTop) {
  1.1322 +            ForkJoinTask<?> task = pollSubmission();
  1.1323 +            if (task != null) {
  1.1324 +                try {
  1.1325 +                    task.cancel(false);
  1.1326 +                } catch (Throwable ignore) {
  1.1327 +                }
  1.1328 +            }
  1.1329 +        }
  1.1330 +    }
  1.1331 +
  1.1332 +    /**
  1.1333 +     * Tries to set the termination status of waiting workers, and
  1.1334 +     * then wakes them up (after which they will terminate).
  1.1335 +     */
  1.1336 +    private void terminateWaiters() {
  1.1337 +        ForkJoinWorkerThread[] ws = workers;
  1.1338 +        if (ws != null) {
  1.1339 +            ForkJoinWorkerThread w; long c; int i, e;
  1.1340 +            int n = ws.length;
  1.1341 +            while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
  1.1342 +                   (w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
  1.1343 +                if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
  1.1344 +                                              (long)(w.nextWait & E_MASK) |
  1.1345 +                                              ((c + AC_UNIT) & AC_MASK) |
  1.1346 +                                              (c & (TC_MASK|STOP_BIT)))) {
  1.1347 +                    w.terminate = true;
  1.1348 +                    w.eventCount = e + EC_UNIT;
  1.1349 +                    if (w.parked)
  1.1350 +                        UNSAFE.unpark(w);
  1.1351 +                }
  1.1352 +            }
  1.1353 +        }
  1.1354 +    }
  1.1355 +
  1.1356 +    // misc ForkJoinWorkerThread support
  1.1357 +
  1.1358 +    /**
  1.1359 +     * Increment or decrement quiescerCount. Needed only to prevent
  1.1360 +     * triggering shutdown if a worker is transiently inactive while
  1.1361 +     * checking quiescence.
  1.1362 +     *
  1.1363 +     * @param delta 1 for increment, -1 for decrement
  1.1364 +     */
  1.1365 +    final void addQuiescerCount(int delta) {
  1.1366 +        int c;
  1.1367 +        do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
  1.1368 +                                               c = quiescerCount, c + delta));
  1.1369 +    }
  1.1370 +
  1.1371 +    /**
  1.1372 +     * Directly increment or decrement active count without
  1.1373 +     * queuing. This method is used to transiently assert inactivation
  1.1374 +     * while checking quiescence.
  1.1375 +     *
  1.1376 +     * @param delta 1 for increment, -1 for decrement
  1.1377 +     */
  1.1378 +    final void addActiveCount(int delta) {
  1.1379 +        long d = delta < 0 ? -AC_UNIT : AC_UNIT;
  1.1380 +        long c;
  1.1381 +        do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
  1.1382 +                                                ((c + d) & AC_MASK) |
  1.1383 +                                                (c & ~AC_MASK)));
  1.1384 +    }
  1.1385 +
  1.1386 +    /**
  1.1387 +     * Returns the approximate (non-atomic) number of idle threads per
  1.1388 +     * active thread.
  1.1389 +     */
  1.1390 +    final int idlePerActive() {
  1.1391 +        // Approximate at powers of two for small values, saturate past 4
  1.1392 +        int p = parallelism;
  1.1393 +        int a = p + (int)(ctl >> AC_SHIFT);
  1.1394 +        return (a > (p >>>= 1) ? 0 :
  1.1395 +                a > (p >>>= 1) ? 1 :
  1.1396 +                a > (p >>>= 1) ? 2 :
  1.1397 +                a > (p >>>= 1) ? 4 :
  1.1398 +                8);
  1.1399 +    }
  1.1400 +
  1.1401 +    // Exported methods
  1.1402 +
  1.1403 +    // Constructors
  1.1404 +
  1.1405 +    /**
  1.1406 +     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
  1.1407 +     * java.lang.Runtime#availableProcessors}, using the {@linkplain
  1.1408 +     * #defaultForkJoinWorkerThreadFactory default thread factory},
  1.1409 +     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  1.1410 +     *
  1.1411 +     * @throws SecurityException if a security manager exists and
  1.1412 +     *         the caller is not permitted to modify threads
  1.1413 +     *         because it does not hold {@link
  1.1414 +     *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1.1415 +     */
  1.1416 +    public ForkJoinPool() {
  1.1417 +        this(Runtime.getRuntime().availableProcessors(),
  1.1418 +             defaultForkJoinWorkerThreadFactory, null, false);
  1.1419 +    }
  1.1420 +
  1.1421 +    /**
  1.1422 +     * Creates a {@code ForkJoinPool} with the indicated parallelism
  1.1423 +     * level, the {@linkplain
  1.1424 +     * #defaultForkJoinWorkerThreadFactory default thread factory},
  1.1425 +     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
  1.1426 +     *
  1.1427 +     * @param parallelism the parallelism level
  1.1428 +     * @throws IllegalArgumentException if parallelism less than or
  1.1429 +     *         equal to zero, or greater than implementation limit
  1.1430 +     * @throws SecurityException if a security manager exists and
  1.1431 +     *         the caller is not permitted to modify threads
  1.1432 +     *         because it does not hold {@link
  1.1433 +     *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1.1434 +     */
  1.1435 +    public ForkJoinPool(int parallelism) {
  1.1436 +        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
  1.1437 +    }
  1.1438 +
  1.1439 +    /**
  1.1440 +     * Creates a {@code ForkJoinPool} with the given parameters.
  1.1441 +     *
  1.1442 +     * @param parallelism the parallelism level. For default value,
  1.1443 +     * use {@link java.lang.Runtime#availableProcessors}.
  1.1444 +     * @param factory the factory for creating new threads. For default value,
  1.1445 +     * use {@link #defaultForkJoinWorkerThreadFactory}.
  1.1446 +     * @param handler the handler for internal worker threads that
  1.1447 +     * terminate due to unrecoverable errors encountered while executing
  1.1448 +     * tasks. For default value, use {@code null}.
  1.1449 +     * @param asyncMode if true,
  1.1450 +     * establishes local first-in-first-out scheduling mode for forked
  1.1451 +     * tasks that are never joined. This mode may be more appropriate
  1.1452 +     * than default locally stack-based mode in applications in which
  1.1453 +     * worker threads only process event-style asynchronous tasks.
  1.1454 +     * For default value, use {@code false}.
  1.1455 +     * @throws IllegalArgumentException if parallelism less than or
  1.1456 +     *         equal to zero, or greater than implementation limit
  1.1457 +     * @throws NullPointerException if the factory is null
  1.1458 +     * @throws SecurityException if a security manager exists and
  1.1459 +     *         the caller is not permitted to modify threads
  1.1460 +     *         because it does not hold {@link
  1.1461 +     *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1.1462 +     */
  1.1463 +    public ForkJoinPool(int parallelism,
  1.1464 +                        ForkJoinWorkerThreadFactory factory,
  1.1465 +                        Thread.UncaughtExceptionHandler handler,
  1.1466 +                        boolean asyncMode) {
  1.1467 +        checkPermission();
  1.1468 +        if (factory == null)
  1.1469 +            throw new NullPointerException();
  1.1470 +        if (parallelism <= 0 || parallelism > MAX_ID)
  1.1471 +            throw new IllegalArgumentException();
  1.1472 +        this.parallelism = parallelism;
  1.1473 +        this.factory = factory;
  1.1474 +        this.ueh = handler;
  1.1475 +        this.locallyFifo = asyncMode;
  1.1476 +        long np = (long)(-parallelism); // offset ctl counts
  1.1477 +        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  1.1478 +        this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
  1.1479 +        // initialize workers array with room for 2*parallelism if possible
  1.1480 +        int n = parallelism << 1;
  1.1481 +        if (n >= MAX_ID)
  1.1482 +            n = MAX_ID;
  1.1483 +        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
  1.1484 +            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
  1.1485 +        }
  1.1486 +        workers = new ForkJoinWorkerThread[n + 1];
  1.1487 +        this.submissionLock = new ReentrantLock();
  1.1488 +        this.termination = submissionLock.newCondition();
  1.1489 +        StringBuilder sb = new StringBuilder("ForkJoinPool-");
  1.1490 +        sb.append(poolNumberGenerator.incrementAndGet());
  1.1491 +        sb.append("-worker-");
  1.1492 +        this.workerNamePrefix = sb.toString();
  1.1493 +    }
  1.1494 +
  1.1495 +    // Execution methods
  1.1496 +
  1.1497 +    /**
  1.1498 +     * Performs the given task, returning its result upon completion.
  1.1499 +     * If the computation encounters an unchecked Exception or Error,
  1.1500 +     * it is rethrown as the outcome of this invocation.  Rethrown
  1.1501 +     * exceptions behave in the same way as regular exceptions, but,
  1.1502 +     * when possible, contain stack traces (as displayed for example
  1.1503 +     * using {@code ex.printStackTrace()}) of both the current thread
  1.1504 +     * as well as the thread actually encountering the exception;
  1.1505 +     * minimally only the latter.
  1.1506 +     *
  1.1507 +     * @param task the task
  1.1508 +     * @return the task's result
  1.1509 +     * @throws NullPointerException if the task is null
  1.1510 +     * @throws RejectedExecutionException if the task cannot be
  1.1511 +     *         scheduled for execution
  1.1512 +     */
  1.1513 +    public <T> T invoke(ForkJoinTask<T> task) {
  1.1514 +        Thread t = Thread.currentThread();
  1.1515 +        if (task == null)
  1.1516 +            throw new NullPointerException();
  1.1517 +        if (shutdown)
  1.1518 +            throw new RejectedExecutionException();
  1.1519 +        if ((t instanceof ForkJoinWorkerThread) &&
  1.1520 +            ((ForkJoinWorkerThread)t).pool == this)
  1.1521 +            return task.invoke();  // bypass submit if in same pool
  1.1522 +        else {
  1.1523 +            addSubmission(task);
  1.1524 +            return task.join();
  1.1525 +        }
  1.1526 +    }
  1.1527 +
  1.1528 +    /**
  1.1529 +     * Unless terminating, forks task if within an ongoing FJ
  1.1530 +     * computation in the current pool, else submits as external task.
  1.1531 +     */
  1.1532 +    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
  1.1533 +        ForkJoinWorkerThread w;
  1.1534 +        Thread t = Thread.currentThread();
  1.1535 +        if (shutdown)
  1.1536 +            throw new RejectedExecutionException();
  1.1537 +        if ((t instanceof ForkJoinWorkerThread) &&
  1.1538 +            (w = (ForkJoinWorkerThread)t).pool == this)
  1.1539 +            w.pushTask(task);
  1.1540 +        else
  1.1541 +            addSubmission(task);
  1.1542 +    }
  1.1543 +
  1.1544 +    /**
  1.1545 +     * Arranges for (asynchronous) execution of the given task.
  1.1546 +     *
  1.1547 +     * @param task the task
  1.1548 +     * @throws NullPointerException if the task is null
  1.1549 +     * @throws RejectedExecutionException if the task cannot be
  1.1550 +     *         scheduled for execution
  1.1551 +     */
  1.1552 +    public void execute(ForkJoinTask<?> task) {
  1.1553 +        if (task == null)
  1.1554 +            throw new NullPointerException();
  1.1555 +        forkOrSubmit(task);
  1.1556 +    }
  1.1557 +
  1.1558 +    // AbstractExecutorService methods
  1.1559 +
  1.1560 +    /**
  1.1561 +     * @throws NullPointerException if the task is null
  1.1562 +     * @throws RejectedExecutionException if the task cannot be
  1.1563 +     *         scheduled for execution
  1.1564 +     */
  1.1565 +    public void execute(Runnable task) {
  1.1566 +        if (task == null)
  1.1567 +            throw new NullPointerException();
  1.1568 +        ForkJoinTask<?> job;
  1.1569 +        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
  1.1570 +            job = (ForkJoinTask<?>) task;
  1.1571 +        else
  1.1572 +            job = ForkJoinTask.adapt(task, null);
  1.1573 +        forkOrSubmit(job);
  1.1574 +    }
  1.1575 +
  1.1576 +    /**
  1.1577 +     * Submits a ForkJoinTask for execution.
  1.1578 +     *
  1.1579 +     * @param task the task to submit
  1.1580 +     * @return the task
  1.1581 +     * @throws NullPointerException if the task is null
  1.1582 +     * @throws RejectedExecutionException if the task cannot be
  1.1583 +     *         scheduled for execution
  1.1584 +     */
  1.1585 +    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  1.1586 +        if (task == null)
  1.1587 +            throw new NullPointerException();
  1.1588 +        forkOrSubmit(task);
  1.1589 +        return task;
  1.1590 +    }
  1.1591 +
  1.1592 +    /**
  1.1593 +     * @throws NullPointerException if the task is null
  1.1594 +     * @throws RejectedExecutionException if the task cannot be
  1.1595 +     *         scheduled for execution
  1.1596 +     */
  1.1597 +    public <T> ForkJoinTask<T> submit(Callable<T> task) {
  1.1598 +        if (task == null)
  1.1599 +            throw new NullPointerException();
  1.1600 +        ForkJoinTask<T> job = ForkJoinTask.adapt(task);
  1.1601 +        forkOrSubmit(job);
  1.1602 +        return job;
  1.1603 +    }
  1.1604 +
  1.1605 +    /**
  1.1606 +     * @throws NullPointerException if the task is null
  1.1607 +     * @throws RejectedExecutionException if the task cannot be
  1.1608 +     *         scheduled for execution
  1.1609 +     */
  1.1610 +    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
  1.1611 +        if (task == null)
  1.1612 +            throw new NullPointerException();
  1.1613 +        ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
  1.1614 +        forkOrSubmit(job);
  1.1615 +        return job;
  1.1616 +    }
  1.1617 +
  1.1618 +    /**
  1.1619 +     * @throws NullPointerException if the task is null
  1.1620 +     * @throws RejectedExecutionException if the task cannot be
  1.1621 +     *         scheduled for execution
  1.1622 +     */
  1.1623 +    public ForkJoinTask<?> submit(Runnable task) {
  1.1624 +        if (task == null)
  1.1625 +            throw new NullPointerException();
  1.1626 +        ForkJoinTask<?> job;
  1.1627 +        if (task instanceof ForkJoinTask<?>) // avoid re-wrap
  1.1628 +            job = (ForkJoinTask<?>) task;
  1.1629 +        else
  1.1630 +            job = ForkJoinTask.adapt(task, null);
  1.1631 +        forkOrSubmit(job);
  1.1632 +        return job;
  1.1633 +    }
  1.1634 +
  1.1635 +    /**
  1.1636 +     * @throws NullPointerException       {@inheritDoc}
  1.1637 +     * @throws RejectedExecutionException {@inheritDoc}
  1.1638 +     */
  1.1639 +    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
  1.1640 +        ArrayList<ForkJoinTask<T>> forkJoinTasks =
  1.1641 +            new ArrayList<ForkJoinTask<T>>(tasks.size());
  1.1642 +        for (Callable<T> task : tasks)
  1.1643 +            forkJoinTasks.add(ForkJoinTask.adapt(task));
  1.1644 +        invoke(new InvokeAll<T>(forkJoinTasks));
  1.1645 +
  1.1646 +        @SuppressWarnings({"unchecked", "rawtypes"})
  1.1647 +            List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
  1.1648 +        return futures;
  1.1649 +    }
  1.1650 +
  1.1651 +    static final class InvokeAll<T> extends RecursiveAction {
  1.1652 +        final ArrayList<ForkJoinTask<T>> tasks;
  1.1653 +        InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
  1.1654 +        public void compute() {
  1.1655 +            try { invokeAll(tasks); }
  1.1656 +            catch (Exception ignore) {}
  1.1657 +        }
  1.1658 +        private static final long serialVersionUID = -7914297376763021607L;
  1.1659 +    }
  1.1660 +
  1.1661 +    /**
  1.1662 +     * Returns the factory used for constructing new workers.
  1.1663 +     *
  1.1664 +     * @return the factory used for constructing new workers
  1.1665 +     */
  1.1666 +    public ForkJoinWorkerThreadFactory getFactory() {
  1.1667 +        return factory;
  1.1668 +    }
  1.1669 +
  1.1670 +    /**
  1.1671 +     * Returns the handler for internal worker threads that terminate
  1.1672 +     * due to unrecoverable errors encountered while executing tasks.
  1.1673 +     *
  1.1674 +     * @return the handler, or {@code null} if none
  1.1675 +     */
  1.1676 +    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
  1.1677 +        return ueh;
  1.1678 +    }
  1.1679 +
  1.1680 +    /**
  1.1681 +     * Returns the targeted parallelism level of this pool.
  1.1682 +     *
  1.1683 +     * @return the targeted parallelism level of this pool
  1.1684 +     */
  1.1685 +    public int getParallelism() {
  1.1686 +        return parallelism;
  1.1687 +    }
  1.1688 +
  1.1689 +    /**
  1.1690 +     * Returns the number of worker threads that have started but not
  1.1691 +     * yet terminated.  The result returned by this method may differ
  1.1692 +     * from {@link #getParallelism} when threads are created to
  1.1693 +     * maintain parallelism when others are cooperatively blocked.
  1.1694 +     *
  1.1695 +     * @return the number of worker threads
  1.1696 +     */
  1.1697 +    public int getPoolSize() {
  1.1698 +        return parallelism + (short)(ctl >>> TC_SHIFT);
  1.1699 +    }
  1.1700 +
  1.1701 +    /**
  1.1702 +     * Returns {@code true} if this pool uses local first-in-first-out
  1.1703 +     * scheduling mode for forked tasks that are never joined.
  1.1704 +     *
  1.1705 +     * @return {@code true} if this pool uses async mode
  1.1706 +     */
  1.1707 +    public boolean getAsyncMode() {
  1.1708 +        return locallyFifo;
  1.1709 +    }
  1.1710 +
  1.1711 +    /**
  1.1712 +     * Returns an estimate of the number of worker threads that are
  1.1713 +     * not blocked waiting to join tasks or for other managed
  1.1714 +     * synchronization. This method may overestimate the
  1.1715 +     * number of running threads.
  1.1716 +     *
  1.1717 +     * @return the number of worker threads
  1.1718 +     */
  1.1719 +    public int getRunningThreadCount() {
  1.1720 +        int r = parallelism + (int)(ctl >> AC_SHIFT);
  1.1721 +        return (r <= 0) ? 0 : r; // suppress momentarily negative values
  1.1722 +    }
  1.1723 +
  1.1724 +    /**
  1.1725 +     * Returns an estimate of the number of threads that are currently
  1.1726 +     * stealing or executing tasks. This method may overestimate the
  1.1727 +     * number of active threads.
  1.1728 +     *
  1.1729 +     * @return the number of active threads
  1.1730 +     */
  1.1731 +    public int getActiveThreadCount() {
  1.1732 +        int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
  1.1733 +        return (r <= 0) ? 0 : r; // suppress momentarily negative values
  1.1734 +    }
  1.1735 +
  1.1736 +    /**
  1.1737 +     * Returns {@code true} if all worker threads are currently idle.
  1.1738 +     * An idle worker is one that cannot obtain a task to execute
  1.1739 +     * because none are available to steal from other threads, and
  1.1740 +     * there are no pending submissions to the pool. This method is
  1.1741 +     * conservative; it might not return {@code true} immediately upon
  1.1742 +     * idleness of all threads, but will eventually become true if
  1.1743 +     * threads remain inactive.
  1.1744 +     *
  1.1745 +     * @return {@code true} if all threads are currently idle
  1.1746 +     */
  1.1747 +    public boolean isQuiescent() {
  1.1748 +        return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
  1.1749 +    }
  1.1750 +
  1.1751 +    /**
  1.1752 +     * Returns an estimate of the total number of tasks stolen from
  1.1753 +     * one thread's work queue by another. The reported value
  1.1754 +     * underestimates the actual total number of steals when the pool
  1.1755 +     * is not quiescent. This value may be useful for monitoring and
  1.1756 +     * tuning fork/join programs: in general, steal counts should be
  1.1757 +     * high enough to keep threads busy, but low enough to avoid
  1.1758 +     * overhead and contention across threads.
  1.1759 +     *
  1.1760 +     * @return the number of steals
  1.1761 +     */
  1.1762 +    public long getStealCount() {
  1.1763 +        return stealCount;
  1.1764 +    }
  1.1765 +
  1.1766 +    /**
  1.1767 +     * Returns an estimate of the total number of tasks currently held
  1.1768 +     * in queues by worker threads (but not including tasks submitted
  1.1769 +     * to the pool that have not begun executing). This value is only
  1.1770 +     * an approximation, obtained by iterating across all threads in
  1.1771 +     * the pool. This method may be useful for tuning task
  1.1772 +     * granularities.
  1.1773 +     *
  1.1774 +     * @return the number of queued tasks
  1.1775 +     */
  1.1776 +    public long getQueuedTaskCount() {
  1.1777 +        long count = 0;
  1.1778 +        ForkJoinWorkerThread[] ws;
  1.1779 +        if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
  1.1780 +            (ws = workers) != null) {
  1.1781 +            for (ForkJoinWorkerThread w : ws)
  1.1782 +                if (w != null)
  1.1783 +                    count -= w.queueBase - w.queueTop; // must read base first
  1.1784 +        }
  1.1785 +        return count;
  1.1786 +    }
  1.1787 +
  1.1788 +    /**
  1.1789 +     * Returns an estimate of the number of tasks submitted to this
  1.1790 +     * pool that have not yet begun executing.  This method may take
  1.1791 +     * time proportional to the number of submissions.
  1.1792 +     *
  1.1793 +     * @return the number of queued submissions
  1.1794 +     */
  1.1795 +    public int getQueuedSubmissionCount() {
  1.1796 +        return -queueBase + queueTop;
  1.1797 +    }
  1.1798 +
  1.1799 +    /**
  1.1800 +     * Returns {@code true} if there are any tasks submitted to this
  1.1801 +     * pool that have not yet begun executing.
  1.1802 +     *
  1.1803 +     * @return {@code true} if there are any queued submissions
  1.1804 +     */
  1.1805 +    public boolean hasQueuedSubmissions() {
  1.1806 +        return queueBase != queueTop;
  1.1807 +    }
  1.1808 +
  1.1809 +    /**
  1.1810 +     * Removes and returns the next unexecuted submission if one is
  1.1811 +     * available.  This method may be useful in extensions to this
  1.1812 +     * class that re-assign work in systems with multiple pools.
  1.1813 +     *
  1.1814 +     * @return the next submission, or {@code null} if none
  1.1815 +     */
  1.1816 +    protected ForkJoinTask<?> pollSubmission() {
  1.1817 +        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
  1.1818 +        while ((b = queueBase) != queueTop &&
  1.1819 +               (q = submissionQueue) != null &&
  1.1820 +               (i = (q.length - 1) & b) >= 0) {
  1.1821 +            long u = (i << ASHIFT) + ABASE;
  1.1822 +            if ((t = q[i]) != null &&
  1.1823 +                queueBase == b &&
  1.1824 +                UNSAFE.compareAndSwapObject(q, u, t, null)) {
  1.1825 +                queueBase = b + 1;
  1.1826 +                return t;
  1.1827 +            }
  1.1828 +        }
  1.1829 +        return null;
  1.1830 +    }
  1.1831 +
  1.1832 +    /**
  1.1833 +     * Removes all available unexecuted submitted and forked tasks
  1.1834 +     * from scheduling queues and adds them to the given collection,
  1.1835 +     * without altering their execution status. These may include
  1.1836 +     * artificially generated or wrapped tasks. This method is
  1.1837 +     * designed to be invoked only when the pool is known to be
  1.1838 +     * quiescent. Invocations at other times may not remove all
  1.1839 +     * tasks. A failure encountered while attempting to add elements
  1.1840 +     * to collection {@code c} may result in elements being in
  1.1841 +     * neither, either or both collections when the associated
  1.1842 +     * exception is thrown.  The behavior of this operation is
  1.1843 +     * undefined if the specified collection is modified while the
  1.1844 +     * operation is in progress.
  1.1845 +     *
  1.1846 +     * @param c the collection to transfer elements into
  1.1847 +     * @return the number of elements transferred
  1.1848 +     */
  1.1849 +    protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
  1.1850 +        int count = 0;
  1.1851 +        while (queueBase != queueTop) {
  1.1852 +            ForkJoinTask<?> t = pollSubmission();
  1.1853 +            if (t != null) {
  1.1854 +                c.add(t);
  1.1855 +                ++count;
  1.1856 +            }
  1.1857 +        }
  1.1858 +        ForkJoinWorkerThread[] ws;
  1.1859 +        if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
  1.1860 +            (ws = workers) != null) {
  1.1861 +            for (ForkJoinWorkerThread w : ws)
  1.1862 +                if (w != null)
  1.1863 +                    count += w.drainTasksTo(c);
  1.1864 +        }
  1.1865 +        return count;
  1.1866 +    }
  1.1867 +
  1.1868 +    /**
  1.1869 +     * Returns a string identifying this pool, as well as its state,
  1.1870 +     * including indications of run state, parallelism level, and
  1.1871 +     * worker and task counts.
  1.1872 +     *
  1.1873 +     * @return a string identifying this pool, as well as its state
  1.1874 +     */
  1.1875 +    public String toString() {
  1.1876 +        long st = getStealCount();
  1.1877 +        long qt = getQueuedTaskCount();
  1.1878 +        long qs = getQueuedSubmissionCount();
  1.1879 +        int pc = parallelism;
  1.1880 +        long c = ctl;
  1.1881 +        int tc = pc + (short)(c >>> TC_SHIFT);
  1.1882 +        int rc = pc + (int)(c >> AC_SHIFT);
  1.1883 +        if (rc < 0) // ignore transient negative
  1.1884 +            rc = 0;
  1.1885 +        int ac = rc + blockedCount;
  1.1886 +        String level;
  1.1887 +        if ((c & STOP_BIT) != 0)
  1.1888 +            level = (tc == 0) ? "Terminated" : "Terminating";
  1.1889 +        else
  1.1890 +            level = shutdown ? "Shutting down" : "Running";
  1.1891 +        return super.toString() +
  1.1892 +            "[" + level +
  1.1893 +            ", parallelism = " + pc +
  1.1894 +            ", size = " + tc +
  1.1895 +            ", active = " + ac +
  1.1896 +            ", running = " + rc +
  1.1897 +            ", steals = " + st +
  1.1898 +            ", tasks = " + qt +
  1.1899 +            ", submissions = " + qs +
  1.1900 +            "]";
  1.1901 +    }
  1.1902 +
  1.1903 +    /**
  1.1904 +     * Initiates an orderly shutdown in which previously submitted
  1.1905 +     * tasks are executed, but no new tasks will be accepted.
  1.1906 +     * Invocation has no additional effect if already shut down.
  1.1907 +     * Tasks that are in the process of being submitted concurrently
  1.1908 +     * during the course of this method may or may not be rejected.
  1.1909 +     *
  1.1910 +     * @throws SecurityException if a security manager exists and
  1.1911 +     *         the caller is not permitted to modify threads
  1.1912 +     *         because it does not hold {@link
  1.1913 +     *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1.1914 +     */
  1.1915 +    public void shutdown() {
  1.1916 +        checkPermission();
  1.1917 +        shutdown = true;
  1.1918 +        tryTerminate(false);
  1.1919 +    }
  1.1920 +
  1.1921 +    /**
  1.1922 +     * Attempts to cancel and/or stop all tasks, and reject all
  1.1923 +     * subsequently submitted tasks.  Tasks that are in the process of
  1.1924 +     * being submitted or executed concurrently during the course of
  1.1925 +     * this method may or may not be rejected. This method cancels
  1.1926 +     * both existing and unexecuted tasks, in order to permit
  1.1927 +     * termination in the presence of task dependencies. So the method
  1.1928 +     * always returns an empty list (unlike the case for some other
  1.1929 +     * Executors).
  1.1930 +     *
  1.1931 +     * @return an empty list
  1.1932 +     * @throws SecurityException if a security manager exists and
  1.1933 +     *         the caller is not permitted to modify threads
  1.1934 +     *         because it does not hold {@link
  1.1935 +     *         java.lang.RuntimePermission}{@code ("modifyThread")}
  1.1936 +     */
  1.1937 +    public List<Runnable> shutdownNow() {
  1.1938 +        checkPermission();
  1.1939 +        shutdown = true;
  1.1940 +        tryTerminate(true);
  1.1941 +        return Collections.emptyList();
  1.1942 +    }
  1.1943 +
  1.1944 +    /**
  1.1945 +     * Returns {@code true} if all tasks have completed following shut down.
  1.1946 +     *
  1.1947 +     * @return {@code true} if all tasks have completed following shut down
  1.1948 +     */
  1.1949 +    public boolean isTerminated() {
  1.1950 +        long c = ctl;
  1.1951 +        return ((c & STOP_BIT) != 0L &&
  1.1952 +                (short)(c >>> TC_SHIFT) == -parallelism);
  1.1953 +    }
  1.1954 +
  1.1955 +    /**
  1.1956 +     * Returns {@code true} if the process of termination has
  1.1957 +     * commenced but not yet completed.  This method may be useful for
  1.1958 +     * debugging. A return of {@code true} reported a sufficient
  1.1959 +     * period after shutdown may indicate that submitted tasks have
  1.1960 +     * ignored or suppressed interruption, or are waiting for IO,
  1.1961 +     * causing this executor not to properly terminate. (See the
  1.1962 +     * advisory notes for class {@link ForkJoinTask} stating that
  1.1963 +     * tasks should not normally entail blocking operations.  But if
  1.1964 +     * they do, they must abort them on interrupt.)
  1.1965 +     *
  1.1966 +     * @return {@code true} if terminating but not yet terminated
  1.1967 +     */
  1.1968 +    public boolean isTerminating() {
  1.1969 +        long c = ctl;
  1.1970 +        return ((c & STOP_BIT) != 0L &&
  1.1971 +                (short)(c >>> TC_SHIFT) != -parallelism);
  1.1972 +    }
  1.1973 +
  1.1974 +    /**
  1.1975 +     * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
  1.1976 +     */
  1.1977 +    final boolean isAtLeastTerminating() {
  1.1978 +        return (ctl & STOP_BIT) != 0L;
  1.1979 +    }
  1.1980 +
  1.1981 +    /**
  1.1982 +     * Returns {@code true} if this pool has been shut down.
  1.1983 +     *
  1.1984 +     * @return {@code true} if this pool has been shut down
  1.1985 +     */
  1.1986 +    public boolean isShutdown() {
  1.1987 +        return shutdown;
  1.1988 +    }
  1.1989 +
  1.1990 +    /**
  1.1991 +     * Blocks until all tasks have completed execution after a shutdown
  1.1992 +     * request, or the timeout occurs, or the current thread is
  1.1993 +     * interrupted, whichever happens first.
  1.1994 +     *
  1.1995 +     * @param timeout the maximum time to wait
  1.1996 +     * @param unit the time unit of the timeout argument
  1.1997 +     * @return {@code true} if this executor terminated and
  1.1998 +     *         {@code false} if the timeout elapsed before termination
  1.1999 +     * @throws InterruptedException if interrupted while waiting
  1.2000 +     */
  1.2001 +    public boolean awaitTermination(long timeout, TimeUnit unit)
  1.2002 +        throws InterruptedException {
  1.2003 +        long nanos = unit.toNanos(timeout);
  1.2004 +        final ReentrantLock lock = this.submissionLock;
  1.2005 +        lock.lock();
  1.2006 +        try {
  1.2007 +            for (;;) {
  1.2008 +                if (isTerminated())
  1.2009 +                    return true;
  1.2010 +                if (nanos <= 0)
  1.2011 +                    return false;
  1.2012 +                nanos = termination.awaitNanos(nanos);
  1.2013 +            }
  1.2014 +        } finally {
  1.2015 +            lock.unlock();
  1.2016 +        }
  1.2017 +    }
  1.2018 +
  1.2019 +    /**
  1.2020 +     * Interface for extending managed parallelism for tasks running
  1.2021 +     * in {@link ForkJoinPool}s.
  1.2022 +     *
  1.2023 +     * <p>A {@code ManagedBlocker} provides two methods.  Method
  1.2024 +     * {@code isReleasable} must return {@code true} if blocking is
  1.2025 +     * not necessary. Method {@code block} blocks the current thread
  1.2026 +     * if necessary (perhaps internally invoking {@code isReleasable}
  1.2027 +     * before actually blocking). These actions are performed by any
  1.2028 +     * thread invoking {@link ForkJoinPool#managedBlock}.  The
  1.2029 +     * unusual methods in this API accommodate synchronizers that may,
  1.2030 +     * but don't usually, block for long periods. Similarly, they
  1.2031 +     * allow more efficient internal handling of cases in which
  1.2032 +     * additional workers may be, but usually are not, needed to
  1.2033 +     * ensure sufficient parallelism.  Toward this end,
  1.2034 +     * implementations of method {@code isReleasable} must be amenable
  1.2035 +     * to repeated invocation.
  1.2036 +     *
  1.2037 +     * <p>For example, here is a ManagedBlocker based on a
  1.2038 +     * ReentrantLock:
  1.2039 +     *  <pre> {@code
  1.2040 +     * class ManagedLocker implements ManagedBlocker {
  1.2041 +     *   final ReentrantLock lock;
  1.2042 +     *   boolean hasLock = false;
  1.2043 +     *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
  1.2044 +     *   public boolean block() {
  1.2045 +     *     if (!hasLock)
  1.2046 +     *       lock.lock();
  1.2047 +     *     return true;
  1.2048 +     *   }
  1.2049 +     *   public boolean isReleasable() {
  1.2050 +     *     return hasLock || (hasLock = lock.tryLock());
  1.2051 +     *   }
  1.2052 +     * }}</pre>
  1.2053 +     *
  1.2054 +     * <p>Here is a class that possibly blocks waiting for an
  1.2055 +     * item on a given queue:
  1.2056 +     *  <pre> {@code
  1.2057 +     * class QueueTaker<E> implements ManagedBlocker {
  1.2058 +     *   final BlockingQueue<E> queue;
  1.2059 +     *   volatile E item = null;
  1.2060 +     *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
  1.2061 +     *   public boolean block() throws InterruptedException {
  1.2062 +     *     if (item == null)
  1.2063 +     *       item = queue.take();
  1.2064 +     *     return true;
  1.2065 +     *   }
  1.2066 +     *   public boolean isReleasable() {
  1.2067 +     *     return item != null || (item = queue.poll()) != null;
  1.2068 +     *   }
  1.2069 +     *   public E getItem() { // call after pool.managedBlock completes
  1.2070 +     *     return item;
  1.2071 +     *   }
  1.2072 +     * }}</pre>
  1.2073 +     */
  1.2074 +    public static interface ManagedBlocker {
  1.2075 +        /**
  1.2076 +         * Possibly blocks the current thread, for example waiting for
  1.2077 +         * a lock or condition.
  1.2078 +         *
  1.2079 +         * @return {@code true} if no additional blocking is necessary
  1.2080 +         * (i.e., if isReleasable would return true)
  1.2081 +         * @throws InterruptedException if interrupted while waiting
  1.2082 +         * (the method is not required to do so, but is allowed to)
  1.2083 +         */
  1.2084 +        boolean block() throws InterruptedException;
  1.2085 +
  1.2086 +        /**
  1.2087 +         * Returns {@code true} if blocking is unnecessary.
  1.2088 +         */
  1.2089 +        boolean isReleasable();
  1.2090 +    }
  1.2091 +
  1.2092 +    /**
  1.2093 +     * Blocks in accord with the given blocker.  If the current thread
  1.2094 +     * is a {@link ForkJoinWorkerThread}, this method possibly
  1.2095 +     * arranges for a spare thread to be activated if necessary to
  1.2096 +     * ensure sufficient parallelism while the current thread is blocked.
  1.2097 +     *
  1.2098 +     * <p>If the caller is not a {@link ForkJoinTask}, this method is
  1.2099 +     * behaviorally equivalent to
  1.2100 +     *  <pre> {@code
  1.2101 +     * while (!blocker.isReleasable())
  1.2102 +     *   if (blocker.block())
  1.2103 +     *     return;
  1.2104 +     * }</pre>
  1.2105 +     *
  1.2106 +     * If the caller is a {@code ForkJoinTask}, then the pool may
  1.2107 +     * first be expanded to ensure parallelism, and later adjusted.
  1.2108 +     *
  1.2109 +     * @param blocker the blocker
  1.2110 +     * @throws InterruptedException if blocker.block did so
  1.2111 +     */
  1.2112 +    public static void managedBlock(ManagedBlocker blocker)
  1.2113 +        throws InterruptedException {
  1.2114 +        Thread t = Thread.currentThread();
  1.2115 +        if (t instanceof ForkJoinWorkerThread) {
  1.2116 +            ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
  1.2117 +            w.pool.awaitBlocker(blocker);
  1.2118 +        }
  1.2119 +        else {
  1.2120 +            do {} while (!blocker.isReleasable() && !blocker.block());
  1.2121 +        }
  1.2122 +    }
  1.2123 +
  1.2124 +    // AbstractExecutorService overrides.  These rely on undocumented
  1.2125 +    // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
  1.2126 +    // implement RunnableFuture.
  1.2127 +
  1.2128 +    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  1.2129 +        return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
  1.2130 +    }
  1.2131 +
  1.2132 +    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  1.2133 +        return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
  1.2134 +    }
  1.2135 +
  1.2136 +    // Unsafe mechanics
  1.2137 +    private static final sun.misc.Unsafe UNSAFE;
  1.2138 +    private static final long ctlOffset;
  1.2139 +    private static final long stealCountOffset;
  1.2140 +    private static final long blockedCountOffset;
  1.2141 +    private static final long quiescerCountOffset;
  1.2142 +    private static final long scanGuardOffset;
  1.2143 +    private static final long nextWorkerNumberOffset;
  1.2144 +    private static final long ABASE;
  1.2145 +    private static final int ASHIFT;
  1.2146 +
  1.2147 +    static {
  1.2148 +        poolNumberGenerator = new AtomicInteger();
  1.2149 +        workerSeedGenerator = new Random();
  1.2150 +        modifyThreadPermission = new RuntimePermission("modifyThread");
  1.2151 +        defaultForkJoinWorkerThreadFactory =
  1.2152 +            new DefaultForkJoinWorkerThreadFactory();
  1.2153 +        int s;
  1.2154 +        try {
  1.2155 +            UNSAFE = sun.misc.Unsafe.getUnsafe();
  1.2156 +            Class k = ForkJoinPool.class;
  1.2157 +            ctlOffset = UNSAFE.objectFieldOffset
  1.2158 +                (k.getDeclaredField("ctl"));
  1.2159 +            stealCountOffset = UNSAFE.objectFieldOffset
  1.2160 +                (k.getDeclaredField("stealCount"));
  1.2161 +            blockedCountOffset = UNSAFE.objectFieldOffset
  1.2162 +                (k.getDeclaredField("blockedCount"));
  1.2163 +            quiescerCountOffset = UNSAFE.objectFieldOffset
  1.2164 +                (k.getDeclaredField("quiescerCount"));
  1.2165 +            scanGuardOffset = UNSAFE.objectFieldOffset
  1.2166 +                (k.getDeclaredField("scanGuard"));
  1.2167 +            nextWorkerNumberOffset = UNSAFE.objectFieldOffset
  1.2168 +                (k.getDeclaredField("nextWorkerNumber"));
  1.2169 +            Class a = ForkJoinTask[].class;
  1.2170 +            ABASE = UNSAFE.arrayBaseOffset(a);
  1.2171 +            s = UNSAFE.arrayIndexScale(a);
  1.2172 +        } catch (Exception e) {
  1.2173 +            throw new Error(e);
  1.2174 +        }
  1.2175 +        if ((s & (s-1)) != 0)
  1.2176 +            throw new Error("data type scale not a power of two");
  1.2177 +        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
  1.2178 +    }
  1.2179 +
  1.2180 +}