rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
branchjdk7-b147
changeset 1890 212417b74b72
child 1896 9984d9a62bc0
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java	Sat Mar 19 10:46:31 2016 +0100
     1.3 @@ -0,0 +1,998 @@
     1.4 +/*
     1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     1.6 + *
     1.7 + * This code is free software; you can redistribute it and/or modify it
     1.8 + * under the terms of the GNU General Public License version 2 only, as
     1.9 + * published by the Free Software Foundation.  Oracle designates this
    1.10 + * particular file as subject to the "Classpath" exception as provided
    1.11 + * by Oracle in the LICENSE file that accompanied this code.
    1.12 + *
    1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
    1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    1.15 + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    1.16 + * version 2 for more details (a copy is included in the LICENSE file that
    1.17 + * accompanied this code).
    1.18 + *
    1.19 + * You should have received a copy of the GNU General Public License version
    1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
    1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    1.22 + *
    1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    1.24 + * or visit www.oracle.com if you need additional information or have any
    1.25 + * questions.
    1.26 + */
    1.27 +
    1.28 +/*
    1.29 + * This file is available under and governed by the GNU General Public
    1.30 + * License version 2 only, as published by the Free Software Foundation.
    1.31 + * However, the following notice accompanied the original version of this
    1.32 + * file:
    1.33 + *
    1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
    1.35 + * Expert Group and released to the public domain, as explained at
    1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
    1.37 + */
    1.38 +
    1.39 +package java.util.concurrent;
    1.40 +
    1.41 +import java.util.Collection;
    1.42 +import java.util.concurrent.RejectedExecutionException;
    1.43 +
    1.44 +/**
    1.45 + * A thread managed by a {@link ForkJoinPool}, which executes
    1.46 + * {@link ForkJoinTask}s.
    1.47 + * This class is subclassable solely for the sake of adding
    1.48 + * functionality -- there are no overridable methods dealing with
    1.49 + * scheduling or execution.  However, you can override initialization
    1.50 + * and termination methods surrounding the main task processing loop.
    1.51 + * If you do create such a subclass, you will also need to supply a
    1.52 + * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
    1.53 + * in a {@code ForkJoinPool}.
    1.54 + *
    1.55 + * @since 1.7
    1.56 + * @author Doug Lea
    1.57 + */
    1.58 +public class ForkJoinWorkerThread extends Thread {
    1.59 +    /*
    1.60 +     * Overview:
    1.61 +     *
    1.62 +     * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
    1.63 +     * ForkJoinTasks. This class includes bookkeeping in support of
    1.64 +     * worker activation, suspension, and lifecycle control described
    1.65 +     * in more detail in the internal documentation of class
    1.66 +     * ForkJoinPool. And as described further below, this class also
    1.67 +     * includes special-cased support for some ForkJoinTask
    1.68 +     * methods. But the main mechanics involve work-stealing:
    1.69 +     *
    1.70 +     * Work-stealing queues are special forms of Deques that support
    1.71 +     * only three of the four possible end-operations -- push, pop,
    1.72 +     * and deq (aka steal), under the further constraints that push
    1.73 +     * and pop are called only from the owning thread, while deq may
    1.74 +     * be called from other threads.  (If you are unfamiliar with
    1.75 +     * them, you probably want to read Herlihy and Shavit's book "The
    1.76 +     * Art of Multiprocessor programming", chapter 16 describing these
    1.77 +     * in more detail before proceeding.)  The main work-stealing
    1.78 +     * queue design is roughly similar to those in the papers "Dynamic
    1.79 +     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
    1.80 +     * (http://research.sun.com/scalable/pubs/index.html) and
    1.81 +     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
    1.82 +     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
    1.83 +     * The main differences ultimately stem from gc requirements that
    1.84 +     * we null out taken slots as soon as we can, to maintain as small
    1.85 +     * a footprint as possible even in programs generating huge
    1.86 +     * numbers of tasks. To accomplish this, we shift the CAS
    1.87 +     * arbitrating pop vs deq (steal) from being on the indices
    1.88 +     * ("queueBase" and "queueTop") to the slots themselves (mainly
    1.89 +     * via method "casSlotNull()"). So, both a successful pop and deq
    1.90 +     * mainly entail a CAS of a slot from non-null to null.  Because
    1.91 +     * we rely on CASes of references, we do not need tag bits on
    1.92 +     * queueBase or queueTop.  They are simple ints as used in any
    1.93 +     * circular array-based queue (see for example ArrayDeque).
    1.94 +     * Updates to the indices must still be ordered in a way that
    1.95 +     * guarantees that queueTop == queueBase means the queue is empty,
    1.96 +     * but otherwise may err on the side of possibly making the queue
    1.97 +     * appear nonempty when a push, pop, or deq have not fully
    1.98 +     * committed. Note that this means that the deq operation,
    1.99 +     * considered individually, is not wait-free. One thief cannot
   1.100 +     * successfully continue until another in-progress one (or, if
   1.101 +     * previously empty, a push) completes.  However, in the
   1.102 +     * aggregate, we ensure at least probabilistic non-blockingness.
   1.103 +     * If an attempted steal fails, a thief always chooses a different
   1.104 +     * random victim target to try next. So, in order for one thief to
   1.105 +     * progress, it suffices for any in-progress deq or new push on
   1.106 +     * any empty queue to complete.
   1.107 +     *
   1.108 +     * This approach also enables support for "async mode" where local
   1.109 +     * task processing is in FIFO, not LIFO order; simply by using a
   1.110 +     * version of deq rather than pop when locallyFifo is true (as set
   1.111 +     * by the ForkJoinPool).  This allows use in message-passing
   1.112 +     * frameworks in which tasks are never joined.  However neither
   1.113 +     * mode considers affinities, loads, cache localities, etc, so
   1.114 +     * rarely provide the best possible performance on a given
   1.115 +     * machine, but portably provide good throughput by averaging over
   1.116 +     * these factors.  (Further, even if we did try to use such
   1.117 +     * information, we do not usually have a basis for exploiting
   1.118 +     * it. For example, some sets of tasks profit from cache
   1.119 +     * affinities, but others are harmed by cache pollution effects.)
   1.120 +     *
   1.121 +     * When a worker would otherwise be blocked waiting to join a
   1.122 +     * task, it first tries a form of linear helping: Each worker
   1.123 +     * records (in field currentSteal) the most recent task it stole
   1.124 +     * from some other worker. Plus, it records (in field currentJoin)
   1.125 +     * the task it is currently actively joining. Method joinTask uses
   1.126 +     * these markers to try to find a worker to help (i.e., steal back
   1.127 +     * a task from and execute it) that could hasten completion of the
   1.128 +     * actively joined task. In essence, the joiner executes a task
   1.129 +     * that would be on its own local deque had the to-be-joined task
   1.130 +     * not been stolen. This may be seen as a conservative variant of
   1.131 +     * the approach in Wagner & Calder "Leapfrogging: a portable
   1.132 +     * technique for implementing efficient futures" SIGPLAN Notices,
   1.133 +     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
   1.134 +     * in that: (1) We only maintain dependency links across workers
   1.135 +     * upon steals, rather than use per-task bookkeeping.  This may
   1.136 +     * require a linear scan of workers array to locate stealers, but
   1.137 +     * usually doesn't because stealers leave hints (that may become
   1.138 +     * stale/wrong) of where to locate them. This isolates cost to
   1.139 +     * when it is needed, rather than adding to per-task overhead.
   1.140 +     * (2) It is "shallow", ignoring nesting and potentially cyclic
   1.141 +     * mutual steals.  (3) It is intentionally racy: field currentJoin
   1.142 +     * is updated only while actively joining, which means that we
   1.143 +     * miss links in the chain during long-lived tasks, GC stalls etc
   1.144 +     * (which is OK since blocking in such cases is usually a good
   1.145 +     * idea).  (4) We bound the number of attempts to find work (see
   1.146 +     * MAX_HELP) and fall back to suspending the worker and if
   1.147 +     * necessary replacing it with another.
   1.148 +     *
   1.149 +     * Efficient implementation of these algorithms currently relies
   1.150 +     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
   1.151 +     * correct orderings, reads and writes of variable queueBase
   1.152 +     * require volatile ordering.  Variable queueTop need not be
   1.153 +     * volatile because non-local reads always follow those of
   1.154 +     * queueBase.  Similarly, because they are protected by volatile
   1.155 +     * queueBase reads, reads of the queue array and its slots by
   1.156 +     * other threads do not need volatile load semantics, but writes
   1.157 +     * (in push) require store order and CASes (in pop and deq)
   1.158 +     * require (volatile) CAS semantics.  (Michael, Saraswat, and
   1.159 +     * Vechev's algorithm has similar properties, but without support
   1.160 +     * for nulling slots.)  Since these combinations aren't supported
   1.161 +     * using ordinary volatiles, the only way to accomplish these
   1.162 +     * efficiently is to use direct Unsafe calls. (Using external
   1.163 +     * AtomicIntegers and AtomicReferenceArrays for the indices and
   1.164 +     * array is significantly slower because of memory locality and
   1.165 +     * indirection effects.)
   1.166 +     *
   1.167 +     * Further, performance on most platforms is very sensitive to
   1.168 +     * placement and sizing of the (resizable) queue array.  Even
   1.169 +     * though these queues don't usually become all that big, the
   1.170 +     * initial size must be large enough to counteract cache
   1.171 +     * contention effects across multiple queues (especially in the
   1.172 +     * presence of GC cardmarking). Also, to improve thread-locality,
   1.173 +     * queues are initialized after starting.
   1.174 +     */
   1.175 +
   1.176 +    /**
   1.177 +     * Mask for pool indices encoded as shorts
   1.178 +     */
   1.179 +    private static final int  SMASK  = 0xffff;
   1.180 +
   1.181 +    /**
   1.182 +     * Capacity of work-stealing queue array upon initialization.
   1.183 +     * Must be a power of two. Initial size must be at least 4, but is
   1.184 +     * padded to minimize cache effects.
   1.185 +     */
   1.186 +    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
   1.187 +
   1.188 +    /**
   1.189 +     * Maximum size for queue array. Must be a power of two
   1.190 +     * less than or equal to 1 << (31 - width of array entry) to
   1.191 +     * ensure lack of index wraparound, but is capped at a lower
   1.192 +     * value to help users trap runaway computations.
   1.193 +     */
   1.194 +    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
   1.195 +
   1.196 +    /**
   1.197 +     * The work-stealing queue array. Size must be a power of two.
   1.198 +     * Initialized when started (as oposed to when constructed), to
   1.199 +     * improve memory locality.
   1.200 +     */
   1.201 +    ForkJoinTask<?>[] queue;
   1.202 +
   1.203 +    /**
   1.204 +     * The pool this thread works in. Accessed directly by ForkJoinTask.
   1.205 +     */
   1.206 +    final ForkJoinPool pool;
   1.207 +
   1.208 +    /**
   1.209 +     * Index (mod queue.length) of next queue slot to push to or pop
   1.210 +     * from. It is written only by owner thread, and accessed by other
   1.211 +     * threads only after reading (volatile) queueBase.  Both queueTop
   1.212 +     * and queueBase are allowed to wrap around on overflow, but
   1.213 +     * (queueTop - queueBase) still estimates size.
   1.214 +     */
   1.215 +    int queueTop;
   1.216 +
   1.217 +    /**
   1.218 +     * Index (mod queue.length) of least valid queue slot, which is
   1.219 +     * always the next position to steal from if nonempty.
   1.220 +     */
   1.221 +    volatile int queueBase;
   1.222 +
   1.223 +    /**
   1.224 +     * The index of most recent stealer, used as a hint to avoid
   1.225 +     * traversal in method helpJoinTask. This is only a hint because a
   1.226 +     * worker might have had multiple steals and this only holds one
   1.227 +     * of them (usually the most current). Declared non-volatile,
   1.228 +     * relying on other prevailing sync to keep reasonably current.
   1.229 +     */
   1.230 +    int stealHint;
   1.231 +
   1.232 +    /**
   1.233 +     * Index of this worker in pool array. Set once by pool before
   1.234 +     * running, and accessed directly by pool to locate this worker in
   1.235 +     * its workers array.
   1.236 +     */
   1.237 +    final int poolIndex;
   1.238 +
   1.239 +    /**
   1.240 +     * Encoded record for pool task waits. Usages are always
   1.241 +     * surrounded by volatile reads/writes
   1.242 +     */
   1.243 +    int nextWait;
   1.244 +
   1.245 +    /**
   1.246 +     * Complement of poolIndex, offset by count of entries of task
   1.247 +     * waits. Accessed by ForkJoinPool to manage event waiters.
   1.248 +     */
   1.249 +    volatile int eventCount;
   1.250 +
   1.251 +    /**
   1.252 +     * Seed for random number generator for choosing steal victims.
   1.253 +     * Uses Marsaglia xorshift. Must be initialized as nonzero.
   1.254 +     */
   1.255 +    int seed;
   1.256 +
   1.257 +    /**
   1.258 +     * Number of steals. Directly accessed (and reset) by pool when
   1.259 +     * idle.
   1.260 +     */
   1.261 +    int stealCount;
   1.262 +
   1.263 +    /**
   1.264 +     * True if this worker should or did terminate
   1.265 +     */
   1.266 +    volatile boolean terminate;
   1.267 +
   1.268 +    /**
   1.269 +     * Set to true before LockSupport.park; false on return
   1.270 +     */
   1.271 +    volatile boolean parked;
   1.272 +
   1.273 +    /**
   1.274 +     * True if use local fifo, not default lifo, for local polling.
   1.275 +     * Shadows value from ForkJoinPool.
   1.276 +     */
   1.277 +    final boolean locallyFifo;
   1.278 +
   1.279 +    /**
   1.280 +     * The task most recently stolen from another worker (or
   1.281 +     * submission queue).  All uses are surrounded by enough volatile
   1.282 +     * reads/writes to maintain as non-volatile.
   1.283 +     */
   1.284 +    ForkJoinTask<?> currentSteal;
   1.285 +
   1.286 +    /**
   1.287 +     * The task currently being joined, set only when actively trying
   1.288 +     * to help other stealers in helpJoinTask. All uses are surrounded
   1.289 +     * by enough volatile reads/writes to maintain as non-volatile.
   1.290 +     */
   1.291 +    ForkJoinTask<?> currentJoin;
   1.292 +
   1.293 +    /**
   1.294 +     * Creates a ForkJoinWorkerThread operating in the given pool.
   1.295 +     *
   1.296 +     * @param pool the pool this thread works in
   1.297 +     * @throws NullPointerException if pool is null
   1.298 +     */
   1.299 +    protected ForkJoinWorkerThread(ForkJoinPool pool) {
   1.300 +        super(pool.nextWorkerName());
   1.301 +        this.pool = pool;
   1.302 +        int k = pool.registerWorker(this);
   1.303 +        poolIndex = k;
   1.304 +        eventCount = ~k & SMASK; // clear wait count
   1.305 +        locallyFifo = pool.locallyFifo;
   1.306 +        Thread.UncaughtExceptionHandler ueh = pool.ueh;
   1.307 +        if (ueh != null)
   1.308 +            setUncaughtExceptionHandler(ueh);
   1.309 +        setDaemon(true);
   1.310 +    }
   1.311 +
   1.312 +    // Public methods
   1.313 +
   1.314 +    /**
   1.315 +     * Returns the pool hosting this thread.
   1.316 +     *
   1.317 +     * @return the pool
   1.318 +     */
   1.319 +    public ForkJoinPool getPool() {
   1.320 +        return pool;
   1.321 +    }
   1.322 +
   1.323 +    /**
   1.324 +     * Returns the index number of this thread in its pool.  The
   1.325 +     * returned value ranges from zero to the maximum number of
   1.326 +     * threads (minus one) that have ever been created in the pool.
   1.327 +     * This method may be useful for applications that track status or
   1.328 +     * collect results per-worker rather than per-task.
   1.329 +     *
   1.330 +     * @return the index number
   1.331 +     */
   1.332 +    public int getPoolIndex() {
   1.333 +        return poolIndex;
   1.334 +    }
   1.335 +
   1.336 +    // Randomization
   1.337 +
   1.338 +    /**
   1.339 +     * Computes next value for random victim probes and backoffs.
   1.340 +     * Scans don't require a very high quality generator, but also not
   1.341 +     * a crummy one.  Marsaglia xor-shift is cheap and works well
   1.342 +     * enough.  Note: This is manually inlined in FJP.scan() to avoid
   1.343 +     * writes inside busy loops.
   1.344 +     */
   1.345 +    private int nextSeed() {
   1.346 +        int r = seed;
   1.347 +        r ^= r << 13;
   1.348 +        r ^= r >>> 17;
   1.349 +        r ^= r << 5;
   1.350 +        return seed = r;
   1.351 +    }
   1.352 +
   1.353 +    // Run State management
   1.354 +
   1.355 +    /**
   1.356 +     * Initializes internal state after construction but before
   1.357 +     * processing any tasks. If you override this method, you must
   1.358 +     * invoke {@code super.onStart()} at the beginning of the method.
   1.359 +     * Initialization requires care: Most fields must have legal
   1.360 +     * default values, to ensure that attempted accesses from other
   1.361 +     * threads work correctly even before this thread starts
   1.362 +     * processing tasks.
   1.363 +     */
   1.364 +    protected void onStart() {
   1.365 +        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
   1.366 +        int r = pool.workerSeedGenerator.nextInt();
   1.367 +        seed = (r == 0) ? 1 : r; //  must be nonzero
   1.368 +    }
   1.369 +
   1.370 +    /**
   1.371 +     * Performs cleanup associated with termination of this worker
   1.372 +     * thread.  If you override this method, you must invoke
   1.373 +     * {@code super.onTermination} at the end of the overridden method.
   1.374 +     *
   1.375 +     * @param exception the exception causing this thread to abort due
   1.376 +     * to an unrecoverable error, or {@code null} if completed normally
   1.377 +     */
   1.378 +    protected void onTermination(Throwable exception) {
   1.379 +        try {
   1.380 +            terminate = true;
   1.381 +            cancelTasks();
   1.382 +            pool.deregisterWorker(this, exception);
   1.383 +        } catch (Throwable ex) {        // Shouldn't ever happen
   1.384 +            if (exception == null)      // but if so, at least rethrown
   1.385 +                exception = ex;
   1.386 +        } finally {
   1.387 +            if (exception != null)
   1.388 +                UNSAFE.throwException(exception);
   1.389 +        }
   1.390 +    }
   1.391 +
   1.392 +    /**
   1.393 +     * This method is required to be public, but should never be
   1.394 +     * called explicitly. It performs the main run loop to execute
   1.395 +     * {@link ForkJoinTask}s.
   1.396 +     */
   1.397 +    public void run() {
   1.398 +        Throwable exception = null;
   1.399 +        try {
   1.400 +            onStart();
   1.401 +            pool.work(this);
   1.402 +        } catch (Throwable ex) {
   1.403 +            exception = ex;
   1.404 +        } finally {
   1.405 +            onTermination(exception);
   1.406 +        }
   1.407 +    }
   1.408 +
   1.409 +    /*
   1.410 +     * Intrinsics-based atomic writes for queue slots. These are
   1.411 +     * basically the same as methods in AtomicReferenceArray, but
   1.412 +     * specialized for (1) ForkJoinTask elements (2) requirement that
   1.413 +     * nullness and bounds checks have already been performed by
   1.414 +     * callers and (3) effective offsets are known not to overflow
   1.415 +     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
   1.416 +     * need corresponding version for reads: plain array reads are OK
   1.417 +     * because they are protected by other volatile reads and are
   1.418 +     * confirmed by CASes.
   1.419 +     *
   1.420 +     * Most uses don't actually call these methods, but instead
   1.421 +     * contain inlined forms that enable more predictable
   1.422 +     * optimization.  We don't define the version of write used in
   1.423 +     * pushTask at all, but instead inline there a store-fenced array
   1.424 +     * slot write.
   1.425 +     *
   1.426 +     * Also in most methods, as a performance (not correctness) issue,
   1.427 +     * we'd like to encourage compilers not to arbitrarily postpone
   1.428 +     * setting queueTop after writing slot.  Currently there is no
   1.429 +     * intrinsic for arranging this, but using Unsafe putOrderedInt
   1.430 +     * may be a preferable strategy on some compilers even though its
   1.431 +     * main effect is a pre-, not post- fence. To simplify possible
   1.432 +     * changes, the option is left in comments next to the associated
   1.433 +     * assignments.
   1.434 +     */
   1.435 +
   1.436 +    /**
   1.437 +     * CASes slot i of array q from t to null. Caller must ensure q is
   1.438 +     * non-null and index is in range.
   1.439 +     */
   1.440 +    private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
   1.441 +                                             ForkJoinTask<?> t) {
   1.442 +        return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
   1.443 +    }
   1.444 +
   1.445 +    /**
   1.446 +     * Performs a volatile write of the given task at given slot of
   1.447 +     * array q.  Caller must ensure q is non-null and index is in
   1.448 +     * range. This method is used only during resets and backouts.
   1.449 +     */
   1.450 +    private static final void writeSlot(ForkJoinTask<?>[] q, int i,
   1.451 +                                        ForkJoinTask<?> t) {
   1.452 +        UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
   1.453 +    }
   1.454 +
   1.455 +    // queue methods
   1.456 +
   1.457 +    /**
   1.458 +     * Pushes a task. Call only from this thread.
   1.459 +     *
   1.460 +     * @param t the task. Caller must ensure non-null.
   1.461 +     */
   1.462 +    final void pushTask(ForkJoinTask<?> t) {
   1.463 +        ForkJoinTask<?>[] q; int s, m;
   1.464 +        if ((q = queue) != null) {    // ignore if queue removed
   1.465 +            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
   1.466 +            UNSAFE.putOrderedObject(q, u, t);
   1.467 +            queueTop = s + 1;         // or use putOrderedInt
   1.468 +            if ((s -= queueBase) <= 2)
   1.469 +                pool.signalWork();
   1.470 +            else if (s == m)
   1.471 +                growQueue();
   1.472 +        }
   1.473 +    }
   1.474 +
   1.475 +    /**
   1.476 +     * Creates or doubles queue array.  Transfers elements by
   1.477 +     * emulating steals (deqs) from old array and placing, oldest
   1.478 +     * first, into new array.
   1.479 +     */
   1.480 +    private void growQueue() {
   1.481 +        ForkJoinTask<?>[] oldQ = queue;
   1.482 +        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
   1.483 +        if (size > MAXIMUM_QUEUE_CAPACITY)
   1.484 +            throw new RejectedExecutionException("Queue capacity exceeded");
   1.485 +        if (size < INITIAL_QUEUE_CAPACITY)
   1.486 +            size = INITIAL_QUEUE_CAPACITY;
   1.487 +        ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
   1.488 +        int mask = size - 1;
   1.489 +        int top = queueTop;
   1.490 +        int oldMask;
   1.491 +        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
   1.492 +            for (int b = queueBase; b != top; ++b) {
   1.493 +                long u = ((b & oldMask) << ASHIFT) + ABASE;
   1.494 +                Object x = UNSAFE.getObjectVolatile(oldQ, u);
   1.495 +                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
   1.496 +                    UNSAFE.putObjectVolatile
   1.497 +                        (q, ((b & mask) << ASHIFT) + ABASE, x);
   1.498 +            }
   1.499 +        }
   1.500 +    }
   1.501 +
   1.502 +    /**
   1.503 +     * Tries to take a task from the base of the queue, failing if
   1.504 +     * empty or contended. Note: Specializations of this code appear
   1.505 +     * in locallyDeqTask and elsewhere.
   1.506 +     *
   1.507 +     * @return a task, or null if none or contended
   1.508 +     */
   1.509 +    final ForkJoinTask<?> deqTask() {
   1.510 +        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   1.511 +        if (queueTop != (b = queueBase) &&
   1.512 +            (q = queue) != null && // must read q after b
   1.513 +            (i = (q.length - 1) & b) >= 0 &&
   1.514 +            (t = q[i]) != null && queueBase == b &&
   1.515 +            UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
   1.516 +            queueBase = b + 1;
   1.517 +            return t;
   1.518 +        }
   1.519 +        return null;
   1.520 +    }
   1.521 +
   1.522 +    /**
   1.523 +     * Tries to take a task from the base of own queue.  Called only
   1.524 +     * by this thread.
   1.525 +     *
   1.526 +     * @return a task, or null if none
   1.527 +     */
   1.528 +    final ForkJoinTask<?> locallyDeqTask() {
   1.529 +        ForkJoinTask<?> t; int m, b, i;
   1.530 +        ForkJoinTask<?>[] q = queue;
   1.531 +        if (q != null && (m = q.length - 1) >= 0) {
   1.532 +            while (queueTop != (b = queueBase)) {
   1.533 +                if ((t = q[i = m & b]) != null &&
   1.534 +                    queueBase == b &&
   1.535 +                    UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
   1.536 +                                                t, null)) {
   1.537 +                    queueBase = b + 1;
   1.538 +                    return t;
   1.539 +                }
   1.540 +            }
   1.541 +        }
   1.542 +        return null;
   1.543 +    }
   1.544 +
   1.545 +    /**
   1.546 +     * Returns a popped task, or null if empty.
   1.547 +     * Called only by this thread.
   1.548 +     */
   1.549 +    private ForkJoinTask<?> popTask() {
   1.550 +        int m;
   1.551 +        ForkJoinTask<?>[] q = queue;
   1.552 +        if (q != null && (m = q.length - 1) >= 0) {
   1.553 +            for (int s; (s = queueTop) != queueBase;) {
   1.554 +                int i = m & --s;
   1.555 +                long u = (i << ASHIFT) + ABASE; // raw offset
   1.556 +                ForkJoinTask<?> t = q[i];
   1.557 +                if (t == null)   // lost to stealer
   1.558 +                    break;
   1.559 +                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
   1.560 +                    queueTop = s; // or putOrderedInt
   1.561 +                    return t;
   1.562 +                }
   1.563 +            }
   1.564 +        }
   1.565 +        return null;
   1.566 +    }
   1.567 +
   1.568 +    /**
   1.569 +     * Specialized version of popTask to pop only if topmost element
   1.570 +     * is the given task. Called only by this thread.
   1.571 +     *
   1.572 +     * @param t the task. Caller must ensure non-null.
   1.573 +     */
   1.574 +    final boolean unpushTask(ForkJoinTask<?> t) {
   1.575 +        ForkJoinTask<?>[] q;
   1.576 +        int s;
   1.577 +        if ((q = queue) != null && (s = queueTop) != queueBase &&
   1.578 +            UNSAFE.compareAndSwapObject
   1.579 +            (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
   1.580 +            queueTop = s; // or putOrderedInt
   1.581 +            return true;
   1.582 +        }
   1.583 +        return false;
   1.584 +    }
   1.585 +
   1.586 +    /**
   1.587 +     * Returns next task, or null if empty or contended.
   1.588 +     */
   1.589 +    final ForkJoinTask<?> peekTask() {
   1.590 +        int m;
   1.591 +        ForkJoinTask<?>[] q = queue;
   1.592 +        if (q == null || (m = q.length - 1) < 0)
   1.593 +            return null;
   1.594 +        int i = locallyFifo ? queueBase : (queueTop - 1);
   1.595 +        return q[i & m];
   1.596 +    }
   1.597 +
   1.598 +    // Support methods for ForkJoinPool
   1.599 +
   1.600 +    /**
   1.601 +     * Runs the given task, plus any local tasks until queue is empty
   1.602 +     */
   1.603 +    final void execTask(ForkJoinTask<?> t) {
   1.604 +        currentSteal = t;
   1.605 +        for (;;) {
   1.606 +            if (t != null)
   1.607 +                t.doExec();
   1.608 +            if (queueTop == queueBase)
   1.609 +                break;
   1.610 +            t = locallyFifo ? locallyDeqTask() : popTask();
   1.611 +        }
   1.612 +        ++stealCount;
   1.613 +        currentSteal = null;
   1.614 +    }
   1.615 +
   1.616 +    /**
   1.617 +     * Removes and cancels all tasks in queue.  Can be called from any
   1.618 +     * thread.
   1.619 +     */
   1.620 +    final void cancelTasks() {
   1.621 +        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
   1.622 +        if (cj != null && cj.status >= 0)
   1.623 +            cj.cancelIgnoringExceptions();
   1.624 +        ForkJoinTask<?> cs = currentSteal;
   1.625 +        if (cs != null && cs.status >= 0)
   1.626 +            cs.cancelIgnoringExceptions();
   1.627 +        while (queueBase != queueTop) {
   1.628 +            ForkJoinTask<?> t = deqTask();
   1.629 +            if (t != null)
   1.630 +                t.cancelIgnoringExceptions();
   1.631 +        }
   1.632 +    }
   1.633 +
   1.634 +    /**
   1.635 +     * Drains tasks to given collection c.
   1.636 +     *
   1.637 +     * @return the number of tasks drained
   1.638 +     */
   1.639 +    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
   1.640 +        int n = 0;
   1.641 +        while (queueBase != queueTop) {
   1.642 +            ForkJoinTask<?> t = deqTask();
   1.643 +            if (t != null) {
   1.644 +                c.add(t);
   1.645 +                ++n;
   1.646 +            }
   1.647 +        }
   1.648 +        return n;
   1.649 +    }
   1.650 +
   1.651 +    // Support methods for ForkJoinTask
   1.652 +
   1.653 +    /**
   1.654 +     * Returns an estimate of the number of tasks in the queue.
   1.655 +     */
   1.656 +    final int getQueueSize() {
   1.657 +        return queueTop - queueBase;
   1.658 +    }
   1.659 +
   1.660 +    /**
   1.661 +     * Gets and removes a local task.
   1.662 +     *
   1.663 +     * @return a task, if available
   1.664 +     */
   1.665 +    final ForkJoinTask<?> pollLocalTask() {
   1.666 +        return locallyFifo ? locallyDeqTask() : popTask();
   1.667 +    }
   1.668 +
   1.669 +    /**
   1.670 +     * Gets and removes a local or stolen task.
   1.671 +     *
   1.672 +     * @return a task, if available
   1.673 +     */
   1.674 +    final ForkJoinTask<?> pollTask() {
   1.675 +        ForkJoinWorkerThread[] ws;
   1.676 +        ForkJoinTask<?> t = pollLocalTask();
   1.677 +        if (t != null || (ws = pool.workers) == null)
   1.678 +            return t;
   1.679 +        int n = ws.length; // cheap version of FJP.scan
   1.680 +        int steps = n << 1;
   1.681 +        int r = nextSeed();
   1.682 +        int i = 0;
   1.683 +        while (i < steps) {
   1.684 +            ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
   1.685 +            if (w != null && w.queueBase != w.queueTop && w.queue != null) {
   1.686 +                if ((t = w.deqTask()) != null)
   1.687 +                    return t;
   1.688 +                i = 0;
   1.689 +            }
   1.690 +        }
   1.691 +        return null;
   1.692 +    }
   1.693 +
   1.694 +    /**
   1.695 +     * The maximum stolen->joining link depth allowed in helpJoinTask,
   1.696 +     * as well as the maximum number of retries (allowing on average
   1.697 +     * one staleness retry per level) per attempt to instead try
   1.698 +     * compensation.  Depths for legitimate chains are unbounded, but
   1.699 +     * we use a fixed constant to avoid (otherwise unchecked) cycles
   1.700 +     * and bound staleness of traversal parameters at the expense of
   1.701 +     * sometimes blocking when we could be helping.
   1.702 +     */
   1.703 +    private static final int MAX_HELP = 16;
   1.704 +
   1.705 +    /**
   1.706 +     * Possibly runs some tasks and/or blocks, until joinMe is done.
   1.707 +     *
   1.708 +     * @param joinMe the task to join
   1.709 +     * @return completion status on exit
   1.710 +     */
   1.711 +    final int joinTask(ForkJoinTask<?> joinMe) {
   1.712 +        ForkJoinTask<?> prevJoin = currentJoin;
   1.713 +        currentJoin = joinMe;
   1.714 +        for (int s, retries = MAX_HELP;;) {
   1.715 +            if ((s = joinMe.status) < 0) {
   1.716 +                currentJoin = prevJoin;
   1.717 +                return s;
   1.718 +            }
   1.719 +            if (retries > 0) {
   1.720 +                if (queueTop != queueBase) {
   1.721 +                    if (!localHelpJoinTask(joinMe))
   1.722 +                        retries = 0;           // cannot help
   1.723 +                }
   1.724 +                else if (retries == MAX_HELP >>> 1) {
   1.725 +                    --retries;                 // check uncommon case
   1.726 +                    if (tryDeqAndExec(joinMe) >= 0)
   1.727 +                        Thread.yield();        // for politeness
   1.728 +                }
   1.729 +                else
   1.730 +                    retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
   1.731 +            }
   1.732 +            else {
   1.733 +                retries = MAX_HELP;           // restart if not done
   1.734 +                pool.tryAwaitJoin(joinMe);
   1.735 +            }
   1.736 +        }
   1.737 +    }
   1.738 +
   1.739 +    /**
   1.740 +     * If present, pops and executes the given task, or any other
   1.741 +     * cancelled task
   1.742 +     *
   1.743 +     * @return false if any other non-cancelled task exists in local queue
   1.744 +     */
   1.745 +    private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
   1.746 +        int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
   1.747 +        if ((s = queueTop) != queueBase && (q = queue) != null &&
   1.748 +            (i = (q.length - 1) & --s) >= 0 &&
   1.749 +            (t = q[i]) != null) {
   1.750 +            if (t != joinMe && t.status >= 0)
   1.751 +                return false;
   1.752 +            if (UNSAFE.compareAndSwapObject
   1.753 +                (q, (i << ASHIFT) + ABASE, t, null)) {
   1.754 +                queueTop = s;           // or putOrderedInt
   1.755 +                t.doExec();
   1.756 +            }
   1.757 +        }
   1.758 +        return true;
   1.759 +    }
   1.760 +
   1.761 +    /**
   1.762 +     * Tries to locate and execute tasks for a stealer of the given
   1.763 +     * task, or in turn one of its stealers, Traces
   1.764 +     * currentSteal->currentJoin links looking for a thread working on
   1.765 +     * a descendant of the given task and with a non-empty queue to
   1.766 +     * steal back and execute tasks from.  The implementation is very
   1.767 +     * branchy to cope with potential inconsistencies or loops
   1.768 +     * encountering chains that are stale, unknown, or of length
   1.769 +     * greater than MAX_HELP links.  All of these cases are dealt with
   1.770 +     * by just retrying by caller.
   1.771 +     *
   1.772 +     * @param joinMe the task to join
   1.773 +     * @param canSteal true if local queue is empty
   1.774 +     * @return true if ran a task
   1.775 +     */
   1.776 +    private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
   1.777 +        boolean helped = false;
   1.778 +        int m = pool.scanGuard & SMASK;
   1.779 +        ForkJoinWorkerThread[] ws = pool.workers;
   1.780 +        if (ws != null && ws.length > m && joinMe.status >= 0) {
   1.781 +            int levels = MAX_HELP;              // remaining chain length
   1.782 +            ForkJoinTask<?> task = joinMe;      // base of chain
   1.783 +            outer:for (ForkJoinWorkerThread thread = this;;) {
   1.784 +                // Try to find v, the stealer of task, by first using hint
   1.785 +                ForkJoinWorkerThread v = ws[thread.stealHint & m];
   1.786 +                if (v == null || v.currentSteal != task) {
   1.787 +                    for (int j = 0; ;) {        // search array
   1.788 +                        if ((v = ws[j]) != null && v.currentSteal == task) {
   1.789 +                            thread.stealHint = j;
   1.790 +                            break;              // save hint for next time
   1.791 +                        }
   1.792 +                        if (++j > m)
   1.793 +                            break outer;        // can't find stealer
   1.794 +                    }
   1.795 +                }
   1.796 +                // Try to help v, using specialized form of deqTask
   1.797 +                for (;;) {
   1.798 +                    ForkJoinTask<?>[] q; int b, i;
   1.799 +                    if (joinMe.status < 0)
   1.800 +                        break outer;
   1.801 +                    if ((b = v.queueBase) == v.queueTop ||
   1.802 +                        (q = v.queue) == null ||
   1.803 +                        (i = (q.length-1) & b) < 0)
   1.804 +                        break;                  // empty
   1.805 +                    long u = (i << ASHIFT) + ABASE;
   1.806 +                    ForkJoinTask<?> t = q[i];
   1.807 +                    if (task.status < 0)
   1.808 +                        break outer;            // stale
   1.809 +                    if (t != null && v.queueBase == b &&
   1.810 +                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
   1.811 +                        v.queueBase = b + 1;
   1.812 +                        v.stealHint = poolIndex;
   1.813 +                        ForkJoinTask<?> ps = currentSteal;
   1.814 +                        currentSteal = t;
   1.815 +                        t.doExec();
   1.816 +                        currentSteal = ps;
   1.817 +                        helped = true;
   1.818 +                    }
   1.819 +                }
   1.820 +                // Try to descend to find v's stealer
   1.821 +                ForkJoinTask<?> next = v.currentJoin;
   1.822 +                if (--levels > 0 && task.status >= 0 &&
   1.823 +                    next != null && next != task) {
   1.824 +                    task = next;
   1.825 +                    thread = v;
   1.826 +                }
   1.827 +                else
   1.828 +                    break;  // max levels, stale, dead-end, or cyclic
   1.829 +            }
   1.830 +        }
   1.831 +        return helped;
   1.832 +    }
   1.833 +
   1.834 +    /**
   1.835 +     * Performs an uncommon case for joinTask: If task t is at base of
   1.836 +     * some workers queue, steals and executes it.
   1.837 +     *
   1.838 +     * @param t the task
   1.839 +     * @return t's status
   1.840 +     */
   1.841 +    private int tryDeqAndExec(ForkJoinTask<?> t) {
   1.842 +        int m = pool.scanGuard & SMASK;
   1.843 +        ForkJoinWorkerThread[] ws = pool.workers;
   1.844 +        if (ws != null && ws.length > m && t.status >= 0) {
   1.845 +            for (int j = 0; j <= m; ++j) {
   1.846 +                ForkJoinTask<?>[] q; int b, i;
   1.847 +                ForkJoinWorkerThread v = ws[j];
   1.848 +                if (v != null &&
   1.849 +                    (b = v.queueBase) != v.queueTop &&
   1.850 +                    (q = v.queue) != null &&
   1.851 +                    (i = (q.length - 1) & b) >= 0 &&
   1.852 +                    q[i] ==  t) {
   1.853 +                    long u = (i << ASHIFT) + ABASE;
   1.854 +                    if (v.queueBase == b &&
   1.855 +                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
   1.856 +                        v.queueBase = b + 1;
   1.857 +                        v.stealHint = poolIndex;
   1.858 +                        ForkJoinTask<?> ps = currentSteal;
   1.859 +                        currentSteal = t;
   1.860 +                        t.doExec();
   1.861 +                        currentSteal = ps;
   1.862 +                    }
   1.863 +                    break;
   1.864 +                }
   1.865 +            }
   1.866 +        }
   1.867 +        return t.status;
   1.868 +    }
   1.869 +
   1.870 +    /**
   1.871 +     * Implements ForkJoinTask.getSurplusQueuedTaskCount().  Returns
   1.872 +     * an estimate of the number of tasks, offset by a function of
   1.873 +     * number of idle workers.
   1.874 +     *
   1.875 +     * This method provides a cheap heuristic guide for task
   1.876 +     * partitioning when programmers, frameworks, tools, or languages
   1.877 +     * have little or no idea about task granularity.  In essence by
   1.878 +     * offering this method, we ask users only about tradeoffs in
   1.879 +     * overhead vs expected throughput and its variance, rather than
   1.880 +     * how finely to partition tasks.
   1.881 +     *
   1.882 +     * In a steady state strict (tree-structured) computation, each
   1.883 +     * thread makes available for stealing enough tasks for other
   1.884 +     * threads to remain active. Inductively, if all threads play by
   1.885 +     * the same rules, each thread should make available only a
   1.886 +     * constant number of tasks.
   1.887 +     *
   1.888 +     * The minimum useful constant is just 1. But using a value of 1
   1.889 +     * would require immediate replenishment upon each steal to
   1.890 +     * maintain enough tasks, which is infeasible.  Further,
   1.891 +     * partitionings/granularities of offered tasks should minimize
   1.892 +     * steal rates, which in general means that threads nearer the top
   1.893 +     * of computation tree should generate more than those nearer the
   1.894 +     * bottom. In perfect steady state, each thread is at
   1.895 +     * approximately the same level of computation tree. However,
   1.896 +     * producing extra tasks amortizes the uncertainty of progress and
   1.897 +     * diffusion assumptions.
   1.898 +     *
   1.899 +     * So, users will want to use values larger, but not much larger
   1.900 +     * than 1 to both smooth over transient shortages and hedge
   1.901 +     * against uneven progress; as traded off against the cost of
   1.902 +     * extra task overhead. We leave the user to pick a threshold
   1.903 +     * value to compare with the results of this call to guide
   1.904 +     * decisions, but recommend values such as 3.
   1.905 +     *
   1.906 +     * When all threads are active, it is on average OK to estimate
   1.907 +     * surplus strictly locally. In steady-state, if one thread is
   1.908 +     * maintaining say 2 surplus tasks, then so are others. So we can
   1.909 +     * just use estimated queue length (although note that (queueTop -
   1.910 +     * queueBase) can be an overestimate because of stealers lagging
   1.911 +     * increments of queueBase).  However, this strategy alone leads
   1.912 +     * to serious mis-estimates in some non-steady-state conditions
   1.913 +     * (ramp-up, ramp-down, other stalls). We can detect many of these
   1.914 +     * by further considering the number of "idle" threads, that are
   1.915 +     * known to have zero queued tasks, so compensate by a factor of
   1.916 +     * (#idle/#active) threads.
   1.917 +     */
   1.918 +    final int getEstimatedSurplusTaskCount() {
   1.919 +        return queueTop - queueBase - pool.idlePerActive();
   1.920 +    }
   1.921 +
   1.922 +    /**
   1.923 +     * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
   1.924 +     * pool's active count ctl maintenance, but rather than blocking
   1.925 +     * when tasks cannot be found, we rescan until all others cannot
   1.926 +     * find tasks either. The bracketing by pool quiescerCounts
   1.927 +     * updates suppresses pool auto-shutdown mechanics that could
   1.928 +     * otherwise prematurely terminate the pool because all threads
   1.929 +     * appear to be inactive.
   1.930 +     */
   1.931 +    final void helpQuiescePool() {
   1.932 +        boolean active = true;
   1.933 +        ForkJoinTask<?> ps = currentSteal; // to restore below
   1.934 +        ForkJoinPool p = pool;
   1.935 +        p.addQuiescerCount(1);
   1.936 +        for (;;) {
   1.937 +            ForkJoinWorkerThread[] ws = p.workers;
   1.938 +            ForkJoinWorkerThread v = null;
   1.939 +            int n;
   1.940 +            if (queueTop != queueBase)
   1.941 +                v = this;
   1.942 +            else if (ws != null && (n = ws.length) > 1) {
   1.943 +                ForkJoinWorkerThread w;
   1.944 +                int r = nextSeed(); // cheap version of FJP.scan
   1.945 +                int steps = n << 1;
   1.946 +                for (int i = 0; i < steps; ++i) {
   1.947 +                    if ((w = ws[(i + r) & (n - 1)]) != null &&
   1.948 +                        w.queueBase != w.queueTop) {
   1.949 +                        v = w;
   1.950 +                        break;
   1.951 +                    }
   1.952 +                }
   1.953 +            }
   1.954 +            if (v != null) {
   1.955 +                ForkJoinTask<?> t;
   1.956 +                if (!active) {
   1.957 +                    active = true;
   1.958 +                    p.addActiveCount(1);
   1.959 +                }
   1.960 +                if ((t = (v != this) ? v.deqTask() :
   1.961 +                     locallyFifo ? locallyDeqTask() : popTask()) != null) {
   1.962 +                    currentSteal = t;
   1.963 +                    t.doExec();
   1.964 +                    currentSteal = ps;
   1.965 +                }
   1.966 +            }
   1.967 +            else {
   1.968 +                if (active) {
   1.969 +                    active = false;
   1.970 +                    p.addActiveCount(-1);
   1.971 +                }
   1.972 +                if (p.isQuiescent()) {
   1.973 +                    p.addActiveCount(1);
   1.974 +                    p.addQuiescerCount(-1);
   1.975 +                    break;
   1.976 +                }
   1.977 +            }
   1.978 +        }
   1.979 +    }
   1.980 +
   1.981 +    // Unsafe mechanics
   1.982 +    private static final sun.misc.Unsafe UNSAFE;
   1.983 +    private static final long ABASE;
   1.984 +    private static final int ASHIFT;
   1.985 +
   1.986 +    static {
   1.987 +        int s;
   1.988 +        try {
   1.989 +            UNSAFE = sun.misc.Unsafe.getUnsafe();
   1.990 +            Class a = ForkJoinTask[].class;
   1.991 +            ABASE = UNSAFE.arrayBaseOffset(a);
   1.992 +            s = UNSAFE.arrayIndexScale(a);
   1.993 +        } catch (Exception e) {
   1.994 +            throw new Error(e);
   1.995 +        }
   1.996 +        if ((s & (s-1)) != 0)
   1.997 +            throw new Error("data type scale not a power of two");
   1.998 +        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
   1.999 +    }
  1.1000 +
  1.1001 +}