1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,998 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +
1.41 +import java.util.Collection;
1.42 +import java.util.concurrent.RejectedExecutionException;
1.43 +
1.44 +/**
1.45 + * A thread managed by a {@link ForkJoinPool}, which executes
1.46 + * {@link ForkJoinTask}s.
1.47 + * This class is subclassable solely for the sake of adding
1.48 + * functionality -- there are no overridable methods dealing with
1.49 + * scheduling or execution. However, you can override initialization
1.50 + * and termination methods surrounding the main task processing loop.
1.51 + * If you do create such a subclass, you will also need to supply a
1.52 + * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
1.53 + * in a {@code ForkJoinPool}.
1.54 + *
1.55 + * @since 1.7
1.56 + * @author Doug Lea
1.57 + */
1.58 +public class ForkJoinWorkerThread extends Thread {
1.59 + /*
1.60 + * Overview:
1.61 + *
1.62 + * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
1.63 + * ForkJoinTasks. This class includes bookkeeping in support of
1.64 + * worker activation, suspension, and lifecycle control described
1.65 + * in more detail in the internal documentation of class
1.66 + * ForkJoinPool. And as described further below, this class also
1.67 + * includes special-cased support for some ForkJoinTask
1.68 + * methods. But the main mechanics involve work-stealing:
1.69 + *
1.70 + * Work-stealing queues are special forms of Deques that support
1.71 + * only three of the four possible end-operations -- push, pop,
1.72 + * and deq (aka steal), under the further constraints that push
1.73 + * and pop are called only from the owning thread, while deq may
1.74 + * be called from other threads. (If you are unfamiliar with
1.75 + * them, you probably want to read Herlihy and Shavit's book "The
1.76 + * Art of Multiprocessor programming", chapter 16 describing these
1.77 + * in more detail before proceeding.) The main work-stealing
1.78 + * queue design is roughly similar to those in the papers "Dynamic
1.79 + * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
1.80 + * (http://research.sun.com/scalable/pubs/index.html) and
1.81 + * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
1.82 + * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
1.83 + * The main differences ultimately stem from gc requirements that
1.84 + * we null out taken slots as soon as we can, to maintain as small
1.85 + * a footprint as possible even in programs generating huge
1.86 + * numbers of tasks. To accomplish this, we shift the CAS
1.87 + * arbitrating pop vs deq (steal) from being on the indices
1.88 + * ("queueBase" and "queueTop") to the slots themselves (mainly
1.89 + * via method "casSlotNull()"). So, both a successful pop and deq
1.90 + * mainly entail a CAS of a slot from non-null to null. Because
1.91 + * we rely on CASes of references, we do not need tag bits on
1.92 + * queueBase or queueTop. They are simple ints as used in any
1.93 + * circular array-based queue (see for example ArrayDeque).
1.94 + * Updates to the indices must still be ordered in a way that
1.95 + * guarantees that queueTop == queueBase means the queue is empty,
1.96 + * but otherwise may err on the side of possibly making the queue
1.97 + * appear nonempty when a push, pop, or deq have not fully
1.98 + * committed. Note that this means that the deq operation,
1.99 + * considered individually, is not wait-free. One thief cannot
1.100 + * successfully continue until another in-progress one (or, if
1.101 + * previously empty, a push) completes. However, in the
1.102 + * aggregate, we ensure at least probabilistic non-blockingness.
1.103 + * If an attempted steal fails, a thief always chooses a different
1.104 + * random victim target to try next. So, in order for one thief to
1.105 + * progress, it suffices for any in-progress deq or new push on
1.106 + * any empty queue to complete.
1.107 + *
1.108 + * This approach also enables support for "async mode" where local
1.109 + * task processing is in FIFO, not LIFO order; simply by using a
1.110 + * version of deq rather than pop when locallyFifo is true (as set
1.111 + * by the ForkJoinPool). This allows use in message-passing
1.112 + * frameworks in which tasks are never joined. However neither
1.113 + * mode considers affinities, loads, cache localities, etc, so
1.114 + * rarely provide the best possible performance on a given
1.115 + * machine, but portably provide good throughput by averaging over
1.116 + * these factors. (Further, even if we did try to use such
1.117 + * information, we do not usually have a basis for exploiting
1.118 + * it. For example, some sets of tasks profit from cache
1.119 + * affinities, but others are harmed by cache pollution effects.)
1.120 + *
1.121 + * When a worker would otherwise be blocked waiting to join a
1.122 + * task, it first tries a form of linear helping: Each worker
1.123 + * records (in field currentSteal) the most recent task it stole
1.124 + * from some other worker. Plus, it records (in field currentJoin)
1.125 + * the task it is currently actively joining. Method joinTask uses
1.126 + * these markers to try to find a worker to help (i.e., steal back
1.127 + * a task from and execute it) that could hasten completion of the
1.128 + * actively joined task. In essence, the joiner executes a task
1.129 + * that would be on its own local deque had the to-be-joined task
1.130 + * not been stolen. This may be seen as a conservative variant of
1.131 + * the approach in Wagner & Calder "Leapfrogging: a portable
1.132 + * technique for implementing efficient futures" SIGPLAN Notices,
1.133 + * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
1.134 + * in that: (1) We only maintain dependency links across workers
1.135 + * upon steals, rather than use per-task bookkeeping. This may
1.136 + * require a linear scan of workers array to locate stealers, but
1.137 + * usually doesn't because stealers leave hints (that may become
1.138 + * stale/wrong) of where to locate them. This isolates cost to
1.139 + * when it is needed, rather than adding to per-task overhead.
1.140 + * (2) It is "shallow", ignoring nesting and potentially cyclic
1.141 + * mutual steals. (3) It is intentionally racy: field currentJoin
1.142 + * is updated only while actively joining, which means that we
1.143 + * miss links in the chain during long-lived tasks, GC stalls etc
1.144 + * (which is OK since blocking in such cases is usually a good
1.145 + * idea). (4) We bound the number of attempts to find work (see
1.146 + * MAX_HELP) and fall back to suspending the worker and if
1.147 + * necessary replacing it with another.
1.148 + *
1.149 + * Efficient implementation of these algorithms currently relies
1.150 + * on an uncomfortable amount of "Unsafe" mechanics. To maintain
1.151 + * correct orderings, reads and writes of variable queueBase
1.152 + * require volatile ordering. Variable queueTop need not be
1.153 + * volatile because non-local reads always follow those of
1.154 + * queueBase. Similarly, because they are protected by volatile
1.155 + * queueBase reads, reads of the queue array and its slots by
1.156 + * other threads do not need volatile load semantics, but writes
1.157 + * (in push) require store order and CASes (in pop and deq)
1.158 + * require (volatile) CAS semantics. (Michael, Saraswat, and
1.159 + * Vechev's algorithm has similar properties, but without support
1.160 + * for nulling slots.) Since these combinations aren't supported
1.161 + * using ordinary volatiles, the only way to accomplish these
1.162 + * efficiently is to use direct Unsafe calls. (Using external
1.163 + * AtomicIntegers and AtomicReferenceArrays for the indices and
1.164 + * array is significantly slower because of memory locality and
1.165 + * indirection effects.)
1.166 + *
1.167 + * Further, performance on most platforms is very sensitive to
1.168 + * placement and sizing of the (resizable) queue array. Even
1.169 + * though these queues don't usually become all that big, the
1.170 + * initial size must be large enough to counteract cache
1.171 + * contention effects across multiple queues (especially in the
1.172 + * presence of GC cardmarking). Also, to improve thread-locality,
1.173 + * queues are initialized after starting.
1.174 + */
1.175 +
1.176 + /**
1.177 + * Mask for pool indices encoded as shorts
1.178 + */
1.179 + private static final int SMASK = 0xffff;
1.180 +
1.181 + /**
1.182 + * Capacity of work-stealing queue array upon initialization.
1.183 + * Must be a power of two. Initial size must be at least 4, but is
1.184 + * padded to minimize cache effects.
1.185 + */
1.186 + private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
1.187 +
1.188 + /**
1.189 + * Maximum size for queue array. Must be a power of two
1.190 + * less than or equal to 1 << (31 - width of array entry) to
1.191 + * ensure lack of index wraparound, but is capped at a lower
1.192 + * value to help users trap runaway computations.
1.193 + */
1.194 + private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
1.195 +
1.196 + /**
1.197 + * The work-stealing queue array. Size must be a power of two.
1.198 + * Initialized when started (as oposed to when constructed), to
1.199 + * improve memory locality.
1.200 + */
1.201 + ForkJoinTask<?>[] queue;
1.202 +
1.203 + /**
1.204 + * The pool this thread works in. Accessed directly by ForkJoinTask.
1.205 + */
1.206 + final ForkJoinPool pool;
1.207 +
1.208 + /**
1.209 + * Index (mod queue.length) of next queue slot to push to or pop
1.210 + * from. It is written only by owner thread, and accessed by other
1.211 + * threads only after reading (volatile) queueBase. Both queueTop
1.212 + * and queueBase are allowed to wrap around on overflow, but
1.213 + * (queueTop - queueBase) still estimates size.
1.214 + */
1.215 + int queueTop;
1.216 +
1.217 + /**
1.218 + * Index (mod queue.length) of least valid queue slot, which is
1.219 + * always the next position to steal from if nonempty.
1.220 + */
1.221 + volatile int queueBase;
1.222 +
1.223 + /**
1.224 + * The index of most recent stealer, used as a hint to avoid
1.225 + * traversal in method helpJoinTask. This is only a hint because a
1.226 + * worker might have had multiple steals and this only holds one
1.227 + * of them (usually the most current). Declared non-volatile,
1.228 + * relying on other prevailing sync to keep reasonably current.
1.229 + */
1.230 + int stealHint;
1.231 +
1.232 + /**
1.233 + * Index of this worker in pool array. Set once by pool before
1.234 + * running, and accessed directly by pool to locate this worker in
1.235 + * its workers array.
1.236 + */
1.237 + final int poolIndex;
1.238 +
1.239 + /**
1.240 + * Encoded record for pool task waits. Usages are always
1.241 + * surrounded by volatile reads/writes
1.242 + */
1.243 + int nextWait;
1.244 +
1.245 + /**
1.246 + * Complement of poolIndex, offset by count of entries of task
1.247 + * waits. Accessed by ForkJoinPool to manage event waiters.
1.248 + */
1.249 + volatile int eventCount;
1.250 +
1.251 + /**
1.252 + * Seed for random number generator for choosing steal victims.
1.253 + * Uses Marsaglia xorshift. Must be initialized as nonzero.
1.254 + */
1.255 + int seed;
1.256 +
1.257 + /**
1.258 + * Number of steals. Directly accessed (and reset) by pool when
1.259 + * idle.
1.260 + */
1.261 + int stealCount;
1.262 +
1.263 + /**
1.264 + * True if this worker should or did terminate
1.265 + */
1.266 + volatile boolean terminate;
1.267 +
1.268 + /**
1.269 + * Set to true before LockSupport.park; false on return
1.270 + */
1.271 + volatile boolean parked;
1.272 +
1.273 + /**
1.274 + * True if use local fifo, not default lifo, for local polling.
1.275 + * Shadows value from ForkJoinPool.
1.276 + */
1.277 + final boolean locallyFifo;
1.278 +
1.279 + /**
1.280 + * The task most recently stolen from another worker (or
1.281 + * submission queue). All uses are surrounded by enough volatile
1.282 + * reads/writes to maintain as non-volatile.
1.283 + */
1.284 + ForkJoinTask<?> currentSteal;
1.285 +
1.286 + /**
1.287 + * The task currently being joined, set only when actively trying
1.288 + * to help other stealers in helpJoinTask. All uses are surrounded
1.289 + * by enough volatile reads/writes to maintain as non-volatile.
1.290 + */
1.291 + ForkJoinTask<?> currentJoin;
1.292 +
1.293 + /**
1.294 + * Creates a ForkJoinWorkerThread operating in the given pool.
1.295 + *
1.296 + * @param pool the pool this thread works in
1.297 + * @throws NullPointerException if pool is null
1.298 + */
1.299 + protected ForkJoinWorkerThread(ForkJoinPool pool) {
1.300 + super(pool.nextWorkerName());
1.301 + this.pool = pool;
1.302 + int k = pool.registerWorker(this);
1.303 + poolIndex = k;
1.304 + eventCount = ~k & SMASK; // clear wait count
1.305 + locallyFifo = pool.locallyFifo;
1.306 + Thread.UncaughtExceptionHandler ueh = pool.ueh;
1.307 + if (ueh != null)
1.308 + setUncaughtExceptionHandler(ueh);
1.309 + setDaemon(true);
1.310 + }
1.311 +
1.312 + // Public methods
1.313 +
1.314 + /**
1.315 + * Returns the pool hosting this thread.
1.316 + *
1.317 + * @return the pool
1.318 + */
1.319 + public ForkJoinPool getPool() {
1.320 + return pool;
1.321 + }
1.322 +
1.323 + /**
1.324 + * Returns the index number of this thread in its pool. The
1.325 + * returned value ranges from zero to the maximum number of
1.326 + * threads (minus one) that have ever been created in the pool.
1.327 + * This method may be useful for applications that track status or
1.328 + * collect results per-worker rather than per-task.
1.329 + *
1.330 + * @return the index number
1.331 + */
1.332 + public int getPoolIndex() {
1.333 + return poolIndex;
1.334 + }
1.335 +
1.336 + // Randomization
1.337 +
1.338 + /**
1.339 + * Computes next value for random victim probes and backoffs.
1.340 + * Scans don't require a very high quality generator, but also not
1.341 + * a crummy one. Marsaglia xor-shift is cheap and works well
1.342 + * enough. Note: This is manually inlined in FJP.scan() to avoid
1.343 + * writes inside busy loops.
1.344 + */
1.345 + private int nextSeed() {
1.346 + int r = seed;
1.347 + r ^= r << 13;
1.348 + r ^= r >>> 17;
1.349 + r ^= r << 5;
1.350 + return seed = r;
1.351 + }
1.352 +
1.353 + // Run State management
1.354 +
1.355 + /**
1.356 + * Initializes internal state after construction but before
1.357 + * processing any tasks. If you override this method, you must
1.358 + * invoke {@code super.onStart()} at the beginning of the method.
1.359 + * Initialization requires care: Most fields must have legal
1.360 + * default values, to ensure that attempted accesses from other
1.361 + * threads work correctly even before this thread starts
1.362 + * processing tasks.
1.363 + */
1.364 + protected void onStart() {
1.365 + queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1.366 + int r = pool.workerSeedGenerator.nextInt();
1.367 + seed = (r == 0) ? 1 : r; // must be nonzero
1.368 + }
1.369 +
1.370 + /**
1.371 + * Performs cleanup associated with termination of this worker
1.372 + * thread. If you override this method, you must invoke
1.373 + * {@code super.onTermination} at the end of the overridden method.
1.374 + *
1.375 + * @param exception the exception causing this thread to abort due
1.376 + * to an unrecoverable error, or {@code null} if completed normally
1.377 + */
1.378 + protected void onTermination(Throwable exception) {
1.379 + try {
1.380 + terminate = true;
1.381 + cancelTasks();
1.382 + pool.deregisterWorker(this, exception);
1.383 + } catch (Throwable ex) { // Shouldn't ever happen
1.384 + if (exception == null) // but if so, at least rethrown
1.385 + exception = ex;
1.386 + } finally {
1.387 + if (exception != null)
1.388 + UNSAFE.throwException(exception);
1.389 + }
1.390 + }
1.391 +
1.392 + /**
1.393 + * This method is required to be public, but should never be
1.394 + * called explicitly. It performs the main run loop to execute
1.395 + * {@link ForkJoinTask}s.
1.396 + */
1.397 + public void run() {
1.398 + Throwable exception = null;
1.399 + try {
1.400 + onStart();
1.401 + pool.work(this);
1.402 + } catch (Throwable ex) {
1.403 + exception = ex;
1.404 + } finally {
1.405 + onTermination(exception);
1.406 + }
1.407 + }
1.408 +
1.409 + /*
1.410 + * Intrinsics-based atomic writes for queue slots. These are
1.411 + * basically the same as methods in AtomicReferenceArray, but
1.412 + * specialized for (1) ForkJoinTask elements (2) requirement that
1.413 + * nullness and bounds checks have already been performed by
1.414 + * callers and (3) effective offsets are known not to overflow
1.415 + * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
1.416 + * need corresponding version for reads: plain array reads are OK
1.417 + * because they are protected by other volatile reads and are
1.418 + * confirmed by CASes.
1.419 + *
1.420 + * Most uses don't actually call these methods, but instead
1.421 + * contain inlined forms that enable more predictable
1.422 + * optimization. We don't define the version of write used in
1.423 + * pushTask at all, but instead inline there a store-fenced array
1.424 + * slot write.
1.425 + *
1.426 + * Also in most methods, as a performance (not correctness) issue,
1.427 + * we'd like to encourage compilers not to arbitrarily postpone
1.428 + * setting queueTop after writing slot. Currently there is no
1.429 + * intrinsic for arranging this, but using Unsafe putOrderedInt
1.430 + * may be a preferable strategy on some compilers even though its
1.431 + * main effect is a pre-, not post- fence. To simplify possible
1.432 + * changes, the option is left in comments next to the associated
1.433 + * assignments.
1.434 + */
1.435 +
1.436 + /**
1.437 + * CASes slot i of array q from t to null. Caller must ensure q is
1.438 + * non-null and index is in range.
1.439 + */
1.440 + private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
1.441 + ForkJoinTask<?> t) {
1.442 + return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
1.443 + }
1.444 +
1.445 + /**
1.446 + * Performs a volatile write of the given task at given slot of
1.447 + * array q. Caller must ensure q is non-null and index is in
1.448 + * range. This method is used only during resets and backouts.
1.449 + */
1.450 + private static final void writeSlot(ForkJoinTask<?>[] q, int i,
1.451 + ForkJoinTask<?> t) {
1.452 + UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
1.453 + }
1.454 +
1.455 + // queue methods
1.456 +
1.457 + /**
1.458 + * Pushes a task. Call only from this thread.
1.459 + *
1.460 + * @param t the task. Caller must ensure non-null.
1.461 + */
1.462 + final void pushTask(ForkJoinTask<?> t) {
1.463 + ForkJoinTask<?>[] q; int s, m;
1.464 + if ((q = queue) != null) { // ignore if queue removed
1.465 + long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
1.466 + UNSAFE.putOrderedObject(q, u, t);
1.467 + queueTop = s + 1; // or use putOrderedInt
1.468 + if ((s -= queueBase) <= 2)
1.469 + pool.signalWork();
1.470 + else if (s == m)
1.471 + growQueue();
1.472 + }
1.473 + }
1.474 +
1.475 + /**
1.476 + * Creates or doubles queue array. Transfers elements by
1.477 + * emulating steals (deqs) from old array and placing, oldest
1.478 + * first, into new array.
1.479 + */
1.480 + private void growQueue() {
1.481 + ForkJoinTask<?>[] oldQ = queue;
1.482 + int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
1.483 + if (size > MAXIMUM_QUEUE_CAPACITY)
1.484 + throw new RejectedExecutionException("Queue capacity exceeded");
1.485 + if (size < INITIAL_QUEUE_CAPACITY)
1.486 + size = INITIAL_QUEUE_CAPACITY;
1.487 + ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
1.488 + int mask = size - 1;
1.489 + int top = queueTop;
1.490 + int oldMask;
1.491 + if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
1.492 + for (int b = queueBase; b != top; ++b) {
1.493 + long u = ((b & oldMask) << ASHIFT) + ABASE;
1.494 + Object x = UNSAFE.getObjectVolatile(oldQ, u);
1.495 + if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
1.496 + UNSAFE.putObjectVolatile
1.497 + (q, ((b & mask) << ASHIFT) + ABASE, x);
1.498 + }
1.499 + }
1.500 + }
1.501 +
1.502 + /**
1.503 + * Tries to take a task from the base of the queue, failing if
1.504 + * empty or contended. Note: Specializations of this code appear
1.505 + * in locallyDeqTask and elsewhere.
1.506 + *
1.507 + * @return a task, or null if none or contended
1.508 + */
1.509 + final ForkJoinTask<?> deqTask() {
1.510 + ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
1.511 + if (queueTop != (b = queueBase) &&
1.512 + (q = queue) != null && // must read q after b
1.513 + (i = (q.length - 1) & b) >= 0 &&
1.514 + (t = q[i]) != null && queueBase == b &&
1.515 + UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
1.516 + queueBase = b + 1;
1.517 + return t;
1.518 + }
1.519 + return null;
1.520 + }
1.521 +
1.522 + /**
1.523 + * Tries to take a task from the base of own queue. Called only
1.524 + * by this thread.
1.525 + *
1.526 + * @return a task, or null if none
1.527 + */
1.528 + final ForkJoinTask<?> locallyDeqTask() {
1.529 + ForkJoinTask<?> t; int m, b, i;
1.530 + ForkJoinTask<?>[] q = queue;
1.531 + if (q != null && (m = q.length - 1) >= 0) {
1.532 + while (queueTop != (b = queueBase)) {
1.533 + if ((t = q[i = m & b]) != null &&
1.534 + queueBase == b &&
1.535 + UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
1.536 + t, null)) {
1.537 + queueBase = b + 1;
1.538 + return t;
1.539 + }
1.540 + }
1.541 + }
1.542 + return null;
1.543 + }
1.544 +
1.545 + /**
1.546 + * Returns a popped task, or null if empty.
1.547 + * Called only by this thread.
1.548 + */
1.549 + private ForkJoinTask<?> popTask() {
1.550 + int m;
1.551 + ForkJoinTask<?>[] q = queue;
1.552 + if (q != null && (m = q.length - 1) >= 0) {
1.553 + for (int s; (s = queueTop) != queueBase;) {
1.554 + int i = m & --s;
1.555 + long u = (i << ASHIFT) + ABASE; // raw offset
1.556 + ForkJoinTask<?> t = q[i];
1.557 + if (t == null) // lost to stealer
1.558 + break;
1.559 + if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
1.560 + queueTop = s; // or putOrderedInt
1.561 + return t;
1.562 + }
1.563 + }
1.564 + }
1.565 + return null;
1.566 + }
1.567 +
1.568 + /**
1.569 + * Specialized version of popTask to pop only if topmost element
1.570 + * is the given task. Called only by this thread.
1.571 + *
1.572 + * @param t the task. Caller must ensure non-null.
1.573 + */
1.574 + final boolean unpushTask(ForkJoinTask<?> t) {
1.575 + ForkJoinTask<?>[] q;
1.576 + int s;
1.577 + if ((q = queue) != null && (s = queueTop) != queueBase &&
1.578 + UNSAFE.compareAndSwapObject
1.579 + (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
1.580 + queueTop = s; // or putOrderedInt
1.581 + return true;
1.582 + }
1.583 + return false;
1.584 + }
1.585 +
1.586 + /**
1.587 + * Returns next task, or null if empty or contended.
1.588 + */
1.589 + final ForkJoinTask<?> peekTask() {
1.590 + int m;
1.591 + ForkJoinTask<?>[] q = queue;
1.592 + if (q == null || (m = q.length - 1) < 0)
1.593 + return null;
1.594 + int i = locallyFifo ? queueBase : (queueTop - 1);
1.595 + return q[i & m];
1.596 + }
1.597 +
1.598 + // Support methods for ForkJoinPool
1.599 +
1.600 + /**
1.601 + * Runs the given task, plus any local tasks until queue is empty
1.602 + */
1.603 + final void execTask(ForkJoinTask<?> t) {
1.604 + currentSteal = t;
1.605 + for (;;) {
1.606 + if (t != null)
1.607 + t.doExec();
1.608 + if (queueTop == queueBase)
1.609 + break;
1.610 + t = locallyFifo ? locallyDeqTask() : popTask();
1.611 + }
1.612 + ++stealCount;
1.613 + currentSteal = null;
1.614 + }
1.615 +
1.616 + /**
1.617 + * Removes and cancels all tasks in queue. Can be called from any
1.618 + * thread.
1.619 + */
1.620 + final void cancelTasks() {
1.621 + ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
1.622 + if (cj != null && cj.status >= 0)
1.623 + cj.cancelIgnoringExceptions();
1.624 + ForkJoinTask<?> cs = currentSteal;
1.625 + if (cs != null && cs.status >= 0)
1.626 + cs.cancelIgnoringExceptions();
1.627 + while (queueBase != queueTop) {
1.628 + ForkJoinTask<?> t = deqTask();
1.629 + if (t != null)
1.630 + t.cancelIgnoringExceptions();
1.631 + }
1.632 + }
1.633 +
1.634 + /**
1.635 + * Drains tasks to given collection c.
1.636 + *
1.637 + * @return the number of tasks drained
1.638 + */
1.639 + final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1.640 + int n = 0;
1.641 + while (queueBase != queueTop) {
1.642 + ForkJoinTask<?> t = deqTask();
1.643 + if (t != null) {
1.644 + c.add(t);
1.645 + ++n;
1.646 + }
1.647 + }
1.648 + return n;
1.649 + }
1.650 +
1.651 + // Support methods for ForkJoinTask
1.652 +
1.653 + /**
1.654 + * Returns an estimate of the number of tasks in the queue.
1.655 + */
1.656 + final int getQueueSize() {
1.657 + return queueTop - queueBase;
1.658 + }
1.659 +
1.660 + /**
1.661 + * Gets and removes a local task.
1.662 + *
1.663 + * @return a task, if available
1.664 + */
1.665 + final ForkJoinTask<?> pollLocalTask() {
1.666 + return locallyFifo ? locallyDeqTask() : popTask();
1.667 + }
1.668 +
1.669 + /**
1.670 + * Gets and removes a local or stolen task.
1.671 + *
1.672 + * @return a task, if available
1.673 + */
1.674 + final ForkJoinTask<?> pollTask() {
1.675 + ForkJoinWorkerThread[] ws;
1.676 + ForkJoinTask<?> t = pollLocalTask();
1.677 + if (t != null || (ws = pool.workers) == null)
1.678 + return t;
1.679 + int n = ws.length; // cheap version of FJP.scan
1.680 + int steps = n << 1;
1.681 + int r = nextSeed();
1.682 + int i = 0;
1.683 + while (i < steps) {
1.684 + ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
1.685 + if (w != null && w.queueBase != w.queueTop && w.queue != null) {
1.686 + if ((t = w.deqTask()) != null)
1.687 + return t;
1.688 + i = 0;
1.689 + }
1.690 + }
1.691 + return null;
1.692 + }
1.693 +
1.694 + /**
1.695 + * The maximum stolen->joining link depth allowed in helpJoinTask,
1.696 + * as well as the maximum number of retries (allowing on average
1.697 + * one staleness retry per level) per attempt to instead try
1.698 + * compensation. Depths for legitimate chains are unbounded, but
1.699 + * we use a fixed constant to avoid (otherwise unchecked) cycles
1.700 + * and bound staleness of traversal parameters at the expense of
1.701 + * sometimes blocking when we could be helping.
1.702 + */
1.703 + private static final int MAX_HELP = 16;
1.704 +
1.705 + /**
1.706 + * Possibly runs some tasks and/or blocks, until joinMe is done.
1.707 + *
1.708 + * @param joinMe the task to join
1.709 + * @return completion status on exit
1.710 + */
1.711 + final int joinTask(ForkJoinTask<?> joinMe) {
1.712 + ForkJoinTask<?> prevJoin = currentJoin;
1.713 + currentJoin = joinMe;
1.714 + for (int s, retries = MAX_HELP;;) {
1.715 + if ((s = joinMe.status) < 0) {
1.716 + currentJoin = prevJoin;
1.717 + return s;
1.718 + }
1.719 + if (retries > 0) {
1.720 + if (queueTop != queueBase) {
1.721 + if (!localHelpJoinTask(joinMe))
1.722 + retries = 0; // cannot help
1.723 + }
1.724 + else if (retries == MAX_HELP >>> 1) {
1.725 + --retries; // check uncommon case
1.726 + if (tryDeqAndExec(joinMe) >= 0)
1.727 + Thread.yield(); // for politeness
1.728 + }
1.729 + else
1.730 + retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
1.731 + }
1.732 + else {
1.733 + retries = MAX_HELP; // restart if not done
1.734 + pool.tryAwaitJoin(joinMe);
1.735 + }
1.736 + }
1.737 + }
1.738 +
1.739 + /**
1.740 + * If present, pops and executes the given task, or any other
1.741 + * cancelled task
1.742 + *
1.743 + * @return false if any other non-cancelled task exists in local queue
1.744 + */
1.745 + private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
1.746 + int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
1.747 + if ((s = queueTop) != queueBase && (q = queue) != null &&
1.748 + (i = (q.length - 1) & --s) >= 0 &&
1.749 + (t = q[i]) != null) {
1.750 + if (t != joinMe && t.status >= 0)
1.751 + return false;
1.752 + if (UNSAFE.compareAndSwapObject
1.753 + (q, (i << ASHIFT) + ABASE, t, null)) {
1.754 + queueTop = s; // or putOrderedInt
1.755 + t.doExec();
1.756 + }
1.757 + }
1.758 + return true;
1.759 + }
1.760 +
1.761 + /**
1.762 + * Tries to locate and execute tasks for a stealer of the given
1.763 + * task, or in turn one of its stealers, Traces
1.764 + * currentSteal->currentJoin links looking for a thread working on
1.765 + * a descendant of the given task and with a non-empty queue to
1.766 + * steal back and execute tasks from. The implementation is very
1.767 + * branchy to cope with potential inconsistencies or loops
1.768 + * encountering chains that are stale, unknown, or of length
1.769 + * greater than MAX_HELP links. All of these cases are dealt with
1.770 + * by just retrying by caller.
1.771 + *
1.772 + * @param joinMe the task to join
1.773 + * @param canSteal true if local queue is empty
1.774 + * @return true if ran a task
1.775 + */
1.776 + private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
1.777 + boolean helped = false;
1.778 + int m = pool.scanGuard & SMASK;
1.779 + ForkJoinWorkerThread[] ws = pool.workers;
1.780 + if (ws != null && ws.length > m && joinMe.status >= 0) {
1.781 + int levels = MAX_HELP; // remaining chain length
1.782 + ForkJoinTask<?> task = joinMe; // base of chain
1.783 + outer:for (ForkJoinWorkerThread thread = this;;) {
1.784 + // Try to find v, the stealer of task, by first using hint
1.785 + ForkJoinWorkerThread v = ws[thread.stealHint & m];
1.786 + if (v == null || v.currentSteal != task) {
1.787 + for (int j = 0; ;) { // search array
1.788 + if ((v = ws[j]) != null && v.currentSteal == task) {
1.789 + thread.stealHint = j;
1.790 + break; // save hint for next time
1.791 + }
1.792 + if (++j > m)
1.793 + break outer; // can't find stealer
1.794 + }
1.795 + }
1.796 + // Try to help v, using specialized form of deqTask
1.797 + for (;;) {
1.798 + ForkJoinTask<?>[] q; int b, i;
1.799 + if (joinMe.status < 0)
1.800 + break outer;
1.801 + if ((b = v.queueBase) == v.queueTop ||
1.802 + (q = v.queue) == null ||
1.803 + (i = (q.length-1) & b) < 0)
1.804 + break; // empty
1.805 + long u = (i << ASHIFT) + ABASE;
1.806 + ForkJoinTask<?> t = q[i];
1.807 + if (task.status < 0)
1.808 + break outer; // stale
1.809 + if (t != null && v.queueBase == b &&
1.810 + UNSAFE.compareAndSwapObject(q, u, t, null)) {
1.811 + v.queueBase = b + 1;
1.812 + v.stealHint = poolIndex;
1.813 + ForkJoinTask<?> ps = currentSteal;
1.814 + currentSteal = t;
1.815 + t.doExec();
1.816 + currentSteal = ps;
1.817 + helped = true;
1.818 + }
1.819 + }
1.820 + // Try to descend to find v's stealer
1.821 + ForkJoinTask<?> next = v.currentJoin;
1.822 + if (--levels > 0 && task.status >= 0 &&
1.823 + next != null && next != task) {
1.824 + task = next;
1.825 + thread = v;
1.826 + }
1.827 + else
1.828 + break; // max levels, stale, dead-end, or cyclic
1.829 + }
1.830 + }
1.831 + return helped;
1.832 + }
1.833 +
1.834 + /**
1.835 + * Performs an uncommon case for joinTask: If task t is at base of
1.836 + * some workers queue, steals and executes it.
1.837 + *
1.838 + * @param t the task
1.839 + * @return t's status
1.840 + */
1.841 + private int tryDeqAndExec(ForkJoinTask<?> t) {
1.842 + int m = pool.scanGuard & SMASK;
1.843 + ForkJoinWorkerThread[] ws = pool.workers;
1.844 + if (ws != null && ws.length > m && t.status >= 0) {
1.845 + for (int j = 0; j <= m; ++j) {
1.846 + ForkJoinTask<?>[] q; int b, i;
1.847 + ForkJoinWorkerThread v = ws[j];
1.848 + if (v != null &&
1.849 + (b = v.queueBase) != v.queueTop &&
1.850 + (q = v.queue) != null &&
1.851 + (i = (q.length - 1) & b) >= 0 &&
1.852 + q[i] == t) {
1.853 + long u = (i << ASHIFT) + ABASE;
1.854 + if (v.queueBase == b &&
1.855 + UNSAFE.compareAndSwapObject(q, u, t, null)) {
1.856 + v.queueBase = b + 1;
1.857 + v.stealHint = poolIndex;
1.858 + ForkJoinTask<?> ps = currentSteal;
1.859 + currentSteal = t;
1.860 + t.doExec();
1.861 + currentSteal = ps;
1.862 + }
1.863 + break;
1.864 + }
1.865 + }
1.866 + }
1.867 + return t.status;
1.868 + }
1.869 +
1.870 + /**
1.871 + * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns
1.872 + * an estimate of the number of tasks, offset by a function of
1.873 + * number of idle workers.
1.874 + *
1.875 + * This method provides a cheap heuristic guide for task
1.876 + * partitioning when programmers, frameworks, tools, or languages
1.877 + * have little or no idea about task granularity. In essence by
1.878 + * offering this method, we ask users only about tradeoffs in
1.879 + * overhead vs expected throughput and its variance, rather than
1.880 + * how finely to partition tasks.
1.881 + *
1.882 + * In a steady state strict (tree-structured) computation, each
1.883 + * thread makes available for stealing enough tasks for other
1.884 + * threads to remain active. Inductively, if all threads play by
1.885 + * the same rules, each thread should make available only a
1.886 + * constant number of tasks.
1.887 + *
1.888 + * The minimum useful constant is just 1. But using a value of 1
1.889 + * would require immediate replenishment upon each steal to
1.890 + * maintain enough tasks, which is infeasible. Further,
1.891 + * partitionings/granularities of offered tasks should minimize
1.892 + * steal rates, which in general means that threads nearer the top
1.893 + * of computation tree should generate more than those nearer the
1.894 + * bottom. In perfect steady state, each thread is at
1.895 + * approximately the same level of computation tree. However,
1.896 + * producing extra tasks amortizes the uncertainty of progress and
1.897 + * diffusion assumptions.
1.898 + *
1.899 + * So, users will want to use values larger, but not much larger
1.900 + * than 1 to both smooth over transient shortages and hedge
1.901 + * against uneven progress; as traded off against the cost of
1.902 + * extra task overhead. We leave the user to pick a threshold
1.903 + * value to compare with the results of this call to guide
1.904 + * decisions, but recommend values such as 3.
1.905 + *
1.906 + * When all threads are active, it is on average OK to estimate
1.907 + * surplus strictly locally. In steady-state, if one thread is
1.908 + * maintaining say 2 surplus tasks, then so are others. So we can
1.909 + * just use estimated queue length (although note that (queueTop -
1.910 + * queueBase) can be an overestimate because of stealers lagging
1.911 + * increments of queueBase). However, this strategy alone leads
1.912 + * to serious mis-estimates in some non-steady-state conditions
1.913 + * (ramp-up, ramp-down, other stalls). We can detect many of these
1.914 + * by further considering the number of "idle" threads, that are
1.915 + * known to have zero queued tasks, so compensate by a factor of
1.916 + * (#idle/#active) threads.
1.917 + */
1.918 + final int getEstimatedSurplusTaskCount() {
1.919 + return queueTop - queueBase - pool.idlePerActive();
1.920 + }
1.921 +
1.922 + /**
1.923 + * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
1.924 + * pool's active count ctl maintenance, but rather than blocking
1.925 + * when tasks cannot be found, we rescan until all others cannot
1.926 + * find tasks either. The bracketing by pool quiescerCounts
1.927 + * updates suppresses pool auto-shutdown mechanics that could
1.928 + * otherwise prematurely terminate the pool because all threads
1.929 + * appear to be inactive.
1.930 + */
1.931 + final void helpQuiescePool() {
1.932 + boolean active = true;
1.933 + ForkJoinTask<?> ps = currentSteal; // to restore below
1.934 + ForkJoinPool p = pool;
1.935 + p.addQuiescerCount(1);
1.936 + for (;;) {
1.937 + ForkJoinWorkerThread[] ws = p.workers;
1.938 + ForkJoinWorkerThread v = null;
1.939 + int n;
1.940 + if (queueTop != queueBase)
1.941 + v = this;
1.942 + else if (ws != null && (n = ws.length) > 1) {
1.943 + ForkJoinWorkerThread w;
1.944 + int r = nextSeed(); // cheap version of FJP.scan
1.945 + int steps = n << 1;
1.946 + for (int i = 0; i < steps; ++i) {
1.947 + if ((w = ws[(i + r) & (n - 1)]) != null &&
1.948 + w.queueBase != w.queueTop) {
1.949 + v = w;
1.950 + break;
1.951 + }
1.952 + }
1.953 + }
1.954 + if (v != null) {
1.955 + ForkJoinTask<?> t;
1.956 + if (!active) {
1.957 + active = true;
1.958 + p.addActiveCount(1);
1.959 + }
1.960 + if ((t = (v != this) ? v.deqTask() :
1.961 + locallyFifo ? locallyDeqTask() : popTask()) != null) {
1.962 + currentSteal = t;
1.963 + t.doExec();
1.964 + currentSteal = ps;
1.965 + }
1.966 + }
1.967 + else {
1.968 + if (active) {
1.969 + active = false;
1.970 + p.addActiveCount(-1);
1.971 + }
1.972 + if (p.isQuiescent()) {
1.973 + p.addActiveCount(1);
1.974 + p.addQuiescerCount(-1);
1.975 + break;
1.976 + }
1.977 + }
1.978 + }
1.979 + }
1.980 +
1.981 + // Unsafe mechanics
1.982 + private static final sun.misc.Unsafe UNSAFE;
1.983 + private static final long ABASE;
1.984 + private static final int ASHIFT;
1.985 +
1.986 + static {
1.987 + int s;
1.988 + try {
1.989 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.990 + Class a = ForkJoinTask[].class;
1.991 + ABASE = UNSAFE.arrayBaseOffset(a);
1.992 + s = UNSAFE.arrayIndexScale(a);
1.993 + } catch (Exception e) {
1.994 + throw new Error(e);
1.995 + }
1.996 + if ((s & (s-1)) != 0)
1.997 + throw new Error("data type scale not a power of two");
1.998 + ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
1.999 + }
1.1000 +
1.1001 +}