Making java.util.concurrent compilable without references to sun.misc.Unsafe and co.
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
36 package java.util.concurrent;
38 import java.util.Collection;
39 import java.util.concurrent.RejectedExecutionException;
42 * A thread managed by a {@link ForkJoinPool}, which executes
43 * {@link ForkJoinTask}s.
44 * This class is subclassable solely for the sake of adding
45 * functionality -- there are no overridable methods dealing with
46 * scheduling or execution. However, you can override initialization
47 * and termination methods surrounding the main task processing loop.
48 * If you do create such a subclass, you will also need to supply a
49 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
50 * in a {@code ForkJoinPool}.
55 public class ForkJoinWorkerThread extends Thread {
59 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
60 * ForkJoinTasks. This class includes bookkeeping in support of
61 * worker activation, suspension, and lifecycle control described
62 * in more detail in the internal documentation of class
63 * ForkJoinPool. And as described further below, this class also
64 * includes special-cased support for some ForkJoinTask
65 * methods. But the main mechanics involve work-stealing:
67 * Work-stealing queues are special forms of Deques that support
68 * only three of the four possible end-operations -- push, pop,
69 * and deq (aka steal), under the further constraints that push
70 * and pop are called only from the owning thread, while deq may
71 * be called from other threads. (If you are unfamiliar with
72 * them, you probably want to read Herlihy and Shavit's book "The
73 * Art of Multiprocessor programming", chapter 16 describing these
74 * in more detail before proceeding.) The main work-stealing
75 * queue design is roughly similar to those in the papers "Dynamic
76 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
77 * (http://research.sun.com/scalable/pubs/index.html) and
78 * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
79 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
80 * The main differences ultimately stem from gc requirements that
81 * we null out taken slots as soon as we can, to maintain as small
82 * a footprint as possible even in programs generating huge
83 * numbers of tasks. To accomplish this, we shift the CAS
84 * arbitrating pop vs deq (steal) from being on the indices
85 * ("queueBase" and "queueTop") to the slots themselves (mainly
86 * via method "casSlotNull()"). So, both a successful pop and deq
87 * mainly entail a CAS of a slot from non-null to null. Because
88 * we rely on CASes of references, we do not need tag bits on
89 * queueBase or queueTop. They are simple ints as used in any
90 * circular array-based queue (see for example ArrayDeque).
91 * Updates to the indices must still be ordered in a way that
92 * guarantees that queueTop == queueBase means the queue is empty,
93 * but otherwise may err on the side of possibly making the queue
94 * appear nonempty when a push, pop, or deq have not fully
95 * committed. Note that this means that the deq operation,
96 * considered individually, is not wait-free. One thief cannot
97 * successfully continue until another in-progress one (or, if
98 * previously empty, a push) completes. However, in the
99 * aggregate, we ensure at least probabilistic non-blockingness.
100 * If an attempted steal fails, a thief always chooses a different
101 * random victim target to try next. So, in order for one thief to
102 * progress, it suffices for any in-progress deq or new push on
103 * any empty queue to complete.
105 * This approach also enables support for "async mode" where local
106 * task processing is in FIFO, not LIFO order; simply by using a
107 * version of deq rather than pop when locallyFifo is true (as set
108 * by the ForkJoinPool). This allows use in message-passing
109 * frameworks in which tasks are never joined. However neither
110 * mode considers affinities, loads, cache localities, etc, so
111 * rarely provide the best possible performance on a given
112 * machine, but portably provide good throughput by averaging over
113 * these factors. (Further, even if we did try to use such
114 * information, we do not usually have a basis for exploiting
115 * it. For example, some sets of tasks profit from cache
116 * affinities, but others are harmed by cache pollution effects.)
118 * When a worker would otherwise be blocked waiting to join a
119 * task, it first tries a form of linear helping: Each worker
120 * records (in field currentSteal) the most recent task it stole
121 * from some other worker. Plus, it records (in field currentJoin)
122 * the task it is currently actively joining. Method joinTask uses
123 * these markers to try to find a worker to help (i.e., steal back
124 * a task from and execute it) that could hasten completion of the
125 * actively joined task. In essence, the joiner executes a task
126 * that would be on its own local deque had the to-be-joined task
127 * not been stolen. This may be seen as a conservative variant of
128 * the approach in Wagner & Calder "Leapfrogging: a portable
129 * technique for implementing efficient futures" SIGPLAN Notices,
130 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
131 * in that: (1) We only maintain dependency links across workers
132 * upon steals, rather than use per-task bookkeeping. This may
133 * require a linear scan of workers array to locate stealers, but
134 * usually doesn't because stealers leave hints (that may become
135 * stale/wrong) of where to locate them. This isolates cost to
136 * when it is needed, rather than adding to per-task overhead.
137 * (2) It is "shallow", ignoring nesting and potentially cyclic
138 * mutual steals. (3) It is intentionally racy: field currentJoin
139 * is updated only while actively joining, which means that we
140 * miss links in the chain during long-lived tasks, GC stalls etc
141 * (which is OK since blocking in such cases is usually a good
142 * idea). (4) We bound the number of attempts to find work (see
143 * MAX_HELP) and fall back to suspending the worker and if
144 * necessary replacing it with another.
146 * Efficient implementation of these algorithms currently relies
147 * on an uncomfortable amount of "Unsafe" mechanics. To maintain
148 * correct orderings, reads and writes of variable queueBase
149 * require volatile ordering. Variable queueTop need not be
150 * volatile because non-local reads always follow those of
151 * queueBase. Similarly, because they are protected by volatile
152 * queueBase reads, reads of the queue array and its slots by
153 * other threads do not need volatile load semantics, but writes
154 * (in push) require store order and CASes (in pop and deq)
155 * require (volatile) CAS semantics. (Michael, Saraswat, and
156 * Vechev's algorithm has similar properties, but without support
157 * for nulling slots.) Since these combinations aren't supported
158 * using ordinary volatiles, the only way to accomplish these
159 * efficiently is to use direct Unsafe calls. (Using external
160 * AtomicIntegers and AtomicReferenceArrays for the indices and
161 * array is significantly slower because of memory locality and
162 * indirection effects.)
164 * Further, performance on most platforms is very sensitive to
165 * placement and sizing of the (resizable) queue array. Even
166 * though these queues don't usually become all that big, the
167 * initial size must be large enough to counteract cache
168 * contention effects across multiple queues (especially in the
169 * presence of GC cardmarking). Also, to improve thread-locality,
170 * queues are initialized after starting.
174 * Mask for pool indices encoded as shorts
176 private static final int SMASK = 0xffff;
179 * Capacity of work-stealing queue array upon initialization.
180 * Must be a power of two. Initial size must be at least 4, but is
181 * padded to minimize cache effects.
183 private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
186 * Maximum size for queue array. Must be a power of two
187 * less than or equal to 1 << (31 - width of array entry) to
188 * ensure lack of index wraparound, but is capped at a lower
189 * value to help users trap runaway computations.
191 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
194 * The work-stealing queue array. Size must be a power of two.
195 * Initialized when started (as oposed to when constructed), to
196 * improve memory locality.
198 ForkJoinTask<?>[] queue;
201 * The pool this thread works in. Accessed directly by ForkJoinTask.
203 final ForkJoinPool pool;
206 * Index (mod queue.length) of next queue slot to push to or pop
207 * from. It is written only by owner thread, and accessed by other
208 * threads only after reading (volatile) queueBase. Both queueTop
209 * and queueBase are allowed to wrap around on overflow, but
210 * (queueTop - queueBase) still estimates size.
215 * Index (mod queue.length) of least valid queue slot, which is
216 * always the next position to steal from if nonempty.
218 volatile int queueBase;
221 * The index of most recent stealer, used as a hint to avoid
222 * traversal in method helpJoinTask. This is only a hint because a
223 * worker might have had multiple steals and this only holds one
224 * of them (usually the most current). Declared non-volatile,
225 * relying on other prevailing sync to keep reasonably current.
230 * Index of this worker in pool array. Set once by pool before
231 * running, and accessed directly by pool to locate this worker in
237 * Encoded record for pool task waits. Usages are always
238 * surrounded by volatile reads/writes
243 * Complement of poolIndex, offset by count of entries of task
244 * waits. Accessed by ForkJoinPool to manage event waiters.
246 volatile int eventCount;
249 * Seed for random number generator for choosing steal victims.
250 * Uses Marsaglia xorshift. Must be initialized as nonzero.
255 * Number of steals. Directly accessed (and reset) by pool when
261 * True if this worker should or did terminate
263 volatile boolean terminate;
266 * Set to true before LockSupport.park; false on return
268 volatile boolean parked;
271 * True if use local fifo, not default lifo, for local polling.
272 * Shadows value from ForkJoinPool.
274 final boolean locallyFifo;
277 * The task most recently stolen from another worker (or
278 * submission queue). All uses are surrounded by enough volatile
279 * reads/writes to maintain as non-volatile.
281 ForkJoinTask<?> currentSteal;
284 * The task currently being joined, set only when actively trying
285 * to help other stealers in helpJoinTask. All uses are surrounded
286 * by enough volatile reads/writes to maintain as non-volatile.
288 ForkJoinTask<?> currentJoin;
291 * Creates a ForkJoinWorkerThread operating in the given pool.
293 * @param pool the pool this thread works in
294 * @throws NullPointerException if pool is null
296 protected ForkJoinWorkerThread(ForkJoinPool pool) {
297 super(pool.nextWorkerName());
299 int k = pool.registerWorker(this);
301 eventCount = ~k & SMASK; // clear wait count
302 locallyFifo = pool.locallyFifo;
303 Thread.UncaughtExceptionHandler ueh = pool.ueh;
305 setUncaughtExceptionHandler(ueh);
312 * Returns the pool hosting this thread.
316 public ForkJoinPool getPool() {
321 * Returns the index number of this thread in its pool. The
322 * returned value ranges from zero to the maximum number of
323 * threads (minus one) that have ever been created in the pool.
324 * This method may be useful for applications that track status or
325 * collect results per-worker rather than per-task.
327 * @return the index number
329 public int getPoolIndex() {
336 * Computes next value for random victim probes and backoffs.
337 * Scans don't require a very high quality generator, but also not
338 * a crummy one. Marsaglia xor-shift is cheap and works well
339 * enough. Note: This is manually inlined in FJP.scan() to avoid
340 * writes inside busy loops.
342 private int nextSeed() {
350 // Run State management
353 * Initializes internal state after construction but before
354 * processing any tasks. If you override this method, you must
355 * invoke {@code super.onStart()} at the beginning of the method.
356 * Initialization requires care: Most fields must have legal
357 * default values, to ensure that attempted accesses from other
358 * threads work correctly even before this thread starts
361 protected void onStart() {
362 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
363 int r = pool.workerSeedGenerator.nextInt();
364 seed = (r == 0) ? 1 : r; // must be nonzero
368 * Performs cleanup associated with termination of this worker
369 * thread. If you override this method, you must invoke
370 * {@code super.onTermination} at the end of the overridden method.
372 * @param exception the exception causing this thread to abort due
373 * to an unrecoverable error, or {@code null} if completed normally
375 protected void onTermination(Throwable exception) {
379 pool.deregisterWorker(this, exception);
380 } catch (Throwable ex) { // Shouldn't ever happen
381 if (exception == null) // but if so, at least rethrown
384 if (exception != null)
385 UNSAFE.throwException(exception);
390 * This method is required to be public, but should never be
391 * called explicitly. It performs the main run loop to execute
392 * {@link ForkJoinTask}s.
395 Throwable exception = null;
399 } catch (Throwable ex) {
402 onTermination(exception);
407 * Intrinsics-based atomic writes for queue slots. These are
408 * basically the same as methods in AtomicReferenceArray, but
409 * specialized for (1) ForkJoinTask elements (2) requirement that
410 * nullness and bounds checks have already been performed by
411 * callers and (3) effective offsets are known not to overflow
412 * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
413 * need corresponding version for reads: plain array reads are OK
414 * because they are protected by other volatile reads and are
415 * confirmed by CASes.
417 * Most uses don't actually call these methods, but instead
418 * contain inlined forms that enable more predictable
419 * optimization. We don't define the version of write used in
420 * pushTask at all, but instead inline there a store-fenced array
423 * Also in most methods, as a performance (not correctness) issue,
424 * we'd like to encourage compilers not to arbitrarily postpone
425 * setting queueTop after writing slot. Currently there is no
426 * intrinsic for arranging this, but using Unsafe putOrderedInt
427 * may be a preferable strategy on some compilers even though its
428 * main effect is a pre-, not post- fence. To simplify possible
429 * changes, the option is left in comments next to the associated
434 * CASes slot i of array q from t to null. Caller must ensure q is
435 * non-null and index is in range.
437 private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
439 return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
443 * Performs a volatile write of the given task at given slot of
444 * array q. Caller must ensure q is non-null and index is in
445 * range. This method is used only during resets and backouts.
447 private static final void writeSlot(ForkJoinTask<?>[] q, int i,
449 UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
455 * Pushes a task. Call only from this thread.
457 * @param t the task. Caller must ensure non-null.
459 final void pushTask(ForkJoinTask<?> t) {
460 ForkJoinTask<?>[] q; int s, m;
461 if ((q = queue) != null) { // ignore if queue removed
462 long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
463 UNSAFE.putOrderedObject(q, u, t);
464 queueTop = s + 1; // or use putOrderedInt
465 if ((s -= queueBase) <= 2)
473 * Creates or doubles queue array. Transfers elements by
474 * emulating steals (deqs) from old array and placing, oldest
475 * first, into new array.
477 private void growQueue() {
478 ForkJoinTask<?>[] oldQ = queue;
479 int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
480 if (size > MAXIMUM_QUEUE_CAPACITY)
481 throw new RejectedExecutionException("Queue capacity exceeded");
482 if (size < INITIAL_QUEUE_CAPACITY)
483 size = INITIAL_QUEUE_CAPACITY;
484 ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
488 if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
489 for (int b = queueBase; b != top; ++b) {
490 long u = ((b & oldMask) << ASHIFT) + ABASE;
491 Object x = UNSAFE.getObjectVolatile(oldQ, u);
492 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
493 UNSAFE.putObjectVolatile
494 (q, ((b & mask) << ASHIFT) + ABASE, x);
500 * Tries to take a task from the base of the queue, failing if
501 * empty or contended. Note: Specializations of this code appear
502 * in locallyDeqTask and elsewhere.
504 * @return a task, or null if none or contended
506 final ForkJoinTask<?> deqTask() {
507 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
508 if (queueTop != (b = queueBase) &&
509 (q = queue) != null && // must read q after b
510 (i = (q.length - 1) & b) >= 0 &&
511 (t = q[i]) != null && queueBase == b &&
512 UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
520 * Tries to take a task from the base of own queue. Called only
523 * @return a task, or null if none
525 final ForkJoinTask<?> locallyDeqTask() {
526 ForkJoinTask<?> t; int m, b, i;
527 ForkJoinTask<?>[] q = queue;
528 if (q != null && (m = q.length - 1) >= 0) {
529 while (queueTop != (b = queueBase)) {
530 if ((t = q[i = m & b]) != null &&
532 UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
543 * Returns a popped task, or null if empty.
544 * Called only by this thread.
546 private ForkJoinTask<?> popTask() {
548 ForkJoinTask<?>[] q = queue;
549 if (q != null && (m = q.length - 1) >= 0) {
550 for (int s; (s = queueTop) != queueBase;) {
552 long u = (i << ASHIFT) + ABASE; // raw offset
553 ForkJoinTask<?> t = q[i];
554 if (t == null) // lost to stealer
556 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
557 queueTop = s; // or putOrderedInt
566 * Specialized version of popTask to pop only if topmost element
567 * is the given task. Called only by this thread.
569 * @param t the task. Caller must ensure non-null.
571 final boolean unpushTask(ForkJoinTask<?> t) {
574 if ((q = queue) != null && (s = queueTop) != queueBase &&
575 UNSAFE.compareAndSwapObject
576 (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
577 queueTop = s; // or putOrderedInt
584 * Returns next task, or null if empty or contended.
586 final ForkJoinTask<?> peekTask() {
588 ForkJoinTask<?>[] q = queue;
589 if (q == null || (m = q.length - 1) < 0)
591 int i = locallyFifo ? queueBase : (queueTop - 1);
595 // Support methods for ForkJoinPool
598 * Runs the given task, plus any local tasks until queue is empty
600 final void execTask(ForkJoinTask<?> t) {
605 if (queueTop == queueBase)
607 t = locallyFifo ? locallyDeqTask() : popTask();
614 * Removes and cancels all tasks in queue. Can be called from any
617 final void cancelTasks() {
618 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
619 if (cj != null && cj.status >= 0)
620 cj.cancelIgnoringExceptions();
621 ForkJoinTask<?> cs = currentSteal;
622 if (cs != null && cs.status >= 0)
623 cs.cancelIgnoringExceptions();
624 while (queueBase != queueTop) {
625 ForkJoinTask<?> t = deqTask();
627 t.cancelIgnoringExceptions();
632 * Drains tasks to given collection c.
634 * @return the number of tasks drained
636 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
638 while (queueBase != queueTop) {
639 ForkJoinTask<?> t = deqTask();
648 // Support methods for ForkJoinTask
651 * Returns an estimate of the number of tasks in the queue.
653 final int getQueueSize() {
654 return queueTop - queueBase;
658 * Gets and removes a local task.
660 * @return a task, if available
662 final ForkJoinTask<?> pollLocalTask() {
663 return locallyFifo ? locallyDeqTask() : popTask();
667 * Gets and removes a local or stolen task.
669 * @return a task, if available
671 final ForkJoinTask<?> pollTask() {
672 ForkJoinWorkerThread[] ws;
673 ForkJoinTask<?> t = pollLocalTask();
674 if (t != null || (ws = pool.workers) == null)
676 int n = ws.length; // cheap version of FJP.scan
681 ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
682 if (w != null && w.queueBase != w.queueTop && w.queue != null) {
683 if ((t = w.deqTask()) != null)
692 * The maximum stolen->joining link depth allowed in helpJoinTask,
693 * as well as the maximum number of retries (allowing on average
694 * one staleness retry per level) per attempt to instead try
695 * compensation. Depths for legitimate chains are unbounded, but
696 * we use a fixed constant to avoid (otherwise unchecked) cycles
697 * and bound staleness of traversal parameters at the expense of
698 * sometimes blocking when we could be helping.
700 private static final int MAX_HELP = 16;
703 * Possibly runs some tasks and/or blocks, until joinMe is done.
705 * @param joinMe the task to join
706 * @return completion status on exit
708 final int joinTask(ForkJoinTask<?> joinMe) {
709 ForkJoinTask<?> prevJoin = currentJoin;
710 currentJoin = joinMe;
711 for (int s, retries = MAX_HELP;;) {
712 if ((s = joinMe.status) < 0) {
713 currentJoin = prevJoin;
717 if (queueTop != queueBase) {
718 if (!localHelpJoinTask(joinMe))
719 retries = 0; // cannot help
721 else if (retries == MAX_HELP >>> 1) {
722 --retries; // check uncommon case
723 if (tryDeqAndExec(joinMe) >= 0)
724 Thread.yield(); // for politeness
727 retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
730 retries = MAX_HELP; // restart if not done
731 pool.tryAwaitJoin(joinMe);
737 * If present, pops and executes the given task, or any other
740 * @return false if any other non-cancelled task exists in local queue
742 private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
743 int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
744 if ((s = queueTop) != queueBase && (q = queue) != null &&
745 (i = (q.length - 1) & --s) >= 0 &&
746 (t = q[i]) != null) {
747 if (t != joinMe && t.status >= 0)
749 if (UNSAFE.compareAndSwapObject
750 (q, (i << ASHIFT) + ABASE, t, null)) {
751 queueTop = s; // or putOrderedInt
759 * Tries to locate and execute tasks for a stealer of the given
760 * task, or in turn one of its stealers, Traces
761 * currentSteal->currentJoin links looking for a thread working on
762 * a descendant of the given task and with a non-empty queue to
763 * steal back and execute tasks from. The implementation is very
764 * branchy to cope with potential inconsistencies or loops
765 * encountering chains that are stale, unknown, or of length
766 * greater than MAX_HELP links. All of these cases are dealt with
767 * by just retrying by caller.
769 * @param joinMe the task to join
770 * @param canSteal true if local queue is empty
771 * @return true if ran a task
773 private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
774 boolean helped = false;
775 int m = pool.scanGuard & SMASK;
776 ForkJoinWorkerThread[] ws = pool.workers;
777 if (ws != null && ws.length > m && joinMe.status >= 0) {
778 int levels = MAX_HELP; // remaining chain length
779 ForkJoinTask<?> task = joinMe; // base of chain
780 outer:for (ForkJoinWorkerThread thread = this;;) {
781 // Try to find v, the stealer of task, by first using hint
782 ForkJoinWorkerThread v = ws[thread.stealHint & m];
783 if (v == null || v.currentSteal != task) {
784 for (int j = 0; ;) { // search array
785 if ((v = ws[j]) != null && v.currentSteal == task) {
786 thread.stealHint = j;
787 break; // save hint for next time
790 break outer; // can't find stealer
793 // Try to help v, using specialized form of deqTask
795 ForkJoinTask<?>[] q; int b, i;
796 if (joinMe.status < 0)
798 if ((b = v.queueBase) == v.queueTop ||
799 (q = v.queue) == null ||
800 (i = (q.length-1) & b) < 0)
802 long u = (i << ASHIFT) + ABASE;
803 ForkJoinTask<?> t = q[i];
805 break outer; // stale
806 if (t != null && v.queueBase == b &&
807 UNSAFE.compareAndSwapObject(q, u, t, null)) {
809 v.stealHint = poolIndex;
810 ForkJoinTask<?> ps = currentSteal;
817 // Try to descend to find v's stealer
818 ForkJoinTask<?> next = v.currentJoin;
819 if (--levels > 0 && task.status >= 0 &&
820 next != null && next != task) {
825 break; // max levels, stale, dead-end, or cyclic
832 * Performs an uncommon case for joinTask: If task t is at base of
833 * some workers queue, steals and executes it.
838 private int tryDeqAndExec(ForkJoinTask<?> t) {
839 int m = pool.scanGuard & SMASK;
840 ForkJoinWorkerThread[] ws = pool.workers;
841 if (ws != null && ws.length > m && t.status >= 0) {
842 for (int j = 0; j <= m; ++j) {
843 ForkJoinTask<?>[] q; int b, i;
844 ForkJoinWorkerThread v = ws[j];
846 (b = v.queueBase) != v.queueTop &&
847 (q = v.queue) != null &&
848 (i = (q.length - 1) & b) >= 0 &&
850 long u = (i << ASHIFT) + ABASE;
851 if (v.queueBase == b &&
852 UNSAFE.compareAndSwapObject(q, u, t, null)) {
854 v.stealHint = poolIndex;
855 ForkJoinTask<?> ps = currentSteal;
868 * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns
869 * an estimate of the number of tasks, offset by a function of
870 * number of idle workers.
872 * This method provides a cheap heuristic guide for task
873 * partitioning when programmers, frameworks, tools, or languages
874 * have little or no idea about task granularity. In essence by
875 * offering this method, we ask users only about tradeoffs in
876 * overhead vs expected throughput and its variance, rather than
877 * how finely to partition tasks.
879 * In a steady state strict (tree-structured) computation, each
880 * thread makes available for stealing enough tasks for other
881 * threads to remain active. Inductively, if all threads play by
882 * the same rules, each thread should make available only a
883 * constant number of tasks.
885 * The minimum useful constant is just 1. But using a value of 1
886 * would require immediate replenishment upon each steal to
887 * maintain enough tasks, which is infeasible. Further,
888 * partitionings/granularities of offered tasks should minimize
889 * steal rates, which in general means that threads nearer the top
890 * of computation tree should generate more than those nearer the
891 * bottom. In perfect steady state, each thread is at
892 * approximately the same level of computation tree. However,
893 * producing extra tasks amortizes the uncertainty of progress and
894 * diffusion assumptions.
896 * So, users will want to use values larger, but not much larger
897 * than 1 to both smooth over transient shortages and hedge
898 * against uneven progress; as traded off against the cost of
899 * extra task overhead. We leave the user to pick a threshold
900 * value to compare with the results of this call to guide
901 * decisions, but recommend values such as 3.
903 * When all threads are active, it is on average OK to estimate
904 * surplus strictly locally. In steady-state, if one thread is
905 * maintaining say 2 surplus tasks, then so are others. So we can
906 * just use estimated queue length (although note that (queueTop -
907 * queueBase) can be an overestimate because of stealers lagging
908 * increments of queueBase). However, this strategy alone leads
909 * to serious mis-estimates in some non-steady-state conditions
910 * (ramp-up, ramp-down, other stalls). We can detect many of these
911 * by further considering the number of "idle" threads, that are
912 * known to have zero queued tasks, so compensate by a factor of
913 * (#idle/#active) threads.
915 final int getEstimatedSurplusTaskCount() {
916 return queueTop - queueBase - pool.idlePerActive();
920 * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
921 * pool's active count ctl maintenance, but rather than blocking
922 * when tasks cannot be found, we rescan until all others cannot
923 * find tasks either. The bracketing by pool quiescerCounts
924 * updates suppresses pool auto-shutdown mechanics that could
925 * otherwise prematurely terminate the pool because all threads
926 * appear to be inactive.
928 final void helpQuiescePool() {
929 boolean active = true;
930 ForkJoinTask<?> ps = currentSteal; // to restore below
931 ForkJoinPool p = pool;
932 p.addQuiescerCount(1);
934 ForkJoinWorkerThread[] ws = p.workers;
935 ForkJoinWorkerThread v = null;
937 if (queueTop != queueBase)
939 else if (ws != null && (n = ws.length) > 1) {
940 ForkJoinWorkerThread w;
941 int r = nextSeed(); // cheap version of FJP.scan
943 for (int i = 0; i < steps; ++i) {
944 if ((w = ws[(i + r) & (n - 1)]) != null &&
945 w.queueBase != w.queueTop) {
957 if ((t = (v != this) ? v.deqTask() :
958 locallyFifo ? locallyDeqTask() : popTask()) != null) {
967 p.addActiveCount(-1);
969 if (p.isQuiescent()) {
971 p.addQuiescerCount(-1);
979 private static final Unsafe UNSAFE;
980 private static final long ABASE;
981 private static final int ASHIFT;
986 UNSAFE = Unsafe.getUnsafe();
987 Class a = ForkJoinTask[].class;
988 ABASE = UNSAFE.arrayBaseOffset(a);
989 s = UNSAFE.arrayIndexScale(a);
990 } catch (Exception e) {
993 if ((s & (s-1)) != 0)
994 throw new Error("data type scale not a power of two");
995 ASHIFT = 31 - Integer.numberOfLeadingZeros(s);