rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 13:15:11 +0100
changeset 1896 9984d9a62bc0
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent compilable without references to sun.misc.Unsafe and co.
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.Collection;
    39 import java.util.concurrent.RejectedExecutionException;
    40 
    41 /**
    42  * A thread managed by a {@link ForkJoinPool}, which executes
    43  * {@link ForkJoinTask}s.
    44  * This class is subclassable solely for the sake of adding
    45  * functionality -- there are no overridable methods dealing with
    46  * scheduling or execution.  However, you can override initialization
    47  * and termination methods surrounding the main task processing loop.
    48  * If you do create such a subclass, you will also need to supply a
    49  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
    50  * in a {@code ForkJoinPool}.
    51  *
    52  * @since 1.7
    53  * @author Doug Lea
    54  */
    55 public class ForkJoinWorkerThread extends Thread {
    56     /*
    57      * Overview:
    58      *
    59      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
    60      * ForkJoinTasks. This class includes bookkeeping in support of
    61      * worker activation, suspension, and lifecycle control described
    62      * in more detail in the internal documentation of class
    63      * ForkJoinPool. And as described further below, this class also
    64      * includes special-cased support for some ForkJoinTask
    65      * methods. But the main mechanics involve work-stealing:
    66      *
    67      * Work-stealing queues are special forms of Deques that support
    68      * only three of the four possible end-operations -- push, pop,
    69      * and deq (aka steal), under the further constraints that push
    70      * and pop are called only from the owning thread, while deq may
    71      * be called from other threads.  (If you are unfamiliar with
    72      * them, you probably want to read Herlihy and Shavit's book "The
    73      * Art of Multiprocessor programming", chapter 16 describing these
    74      * in more detail before proceeding.)  The main work-stealing
    75      * queue design is roughly similar to those in the papers "Dynamic
    76      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
    77      * (http://research.sun.com/scalable/pubs/index.html) and
    78      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
    79      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
    80      * The main differences ultimately stem from gc requirements that
    81      * we null out taken slots as soon as we can, to maintain as small
    82      * a footprint as possible even in programs generating huge
    83      * numbers of tasks. To accomplish this, we shift the CAS
    84      * arbitrating pop vs deq (steal) from being on the indices
    85      * ("queueBase" and "queueTop") to the slots themselves (mainly
    86      * via method "casSlotNull()"). So, both a successful pop and deq
    87      * mainly entail a CAS of a slot from non-null to null.  Because
    88      * we rely on CASes of references, we do not need tag bits on
    89      * queueBase or queueTop.  They are simple ints as used in any
    90      * circular array-based queue (see for example ArrayDeque).
    91      * Updates to the indices must still be ordered in a way that
    92      * guarantees that queueTop == queueBase means the queue is empty,
    93      * but otherwise may err on the side of possibly making the queue
    94      * appear nonempty when a push, pop, or deq have not fully
    95      * committed. Note that this means that the deq operation,
    96      * considered individually, is not wait-free. One thief cannot
    97      * successfully continue until another in-progress one (or, if
    98      * previously empty, a push) completes.  However, in the
    99      * aggregate, we ensure at least probabilistic non-blockingness.
   100      * If an attempted steal fails, a thief always chooses a different
   101      * random victim target to try next. So, in order for one thief to
   102      * progress, it suffices for any in-progress deq or new push on
   103      * any empty queue to complete.
   104      *
   105      * This approach also enables support for "async mode" where local
   106      * task processing is in FIFO, not LIFO order; simply by using a
   107      * version of deq rather than pop when locallyFifo is true (as set
   108      * by the ForkJoinPool).  This allows use in message-passing
   109      * frameworks in which tasks are never joined.  However neither
   110      * mode considers affinities, loads, cache localities, etc, so
   111      * rarely provide the best possible performance on a given
   112      * machine, but portably provide good throughput by averaging over
   113      * these factors.  (Further, even if we did try to use such
   114      * information, we do not usually have a basis for exploiting
   115      * it. For example, some sets of tasks profit from cache
   116      * affinities, but others are harmed by cache pollution effects.)
   117      *
   118      * When a worker would otherwise be blocked waiting to join a
   119      * task, it first tries a form of linear helping: Each worker
   120      * records (in field currentSteal) the most recent task it stole
   121      * from some other worker. Plus, it records (in field currentJoin)
   122      * the task it is currently actively joining. Method joinTask uses
   123      * these markers to try to find a worker to help (i.e., steal back
   124      * a task from and execute it) that could hasten completion of the
   125      * actively joined task. In essence, the joiner executes a task
   126      * that would be on its own local deque had the to-be-joined task
   127      * not been stolen. This may be seen as a conservative variant of
   128      * the approach in Wagner & Calder "Leapfrogging: a portable
   129      * technique for implementing efficient futures" SIGPLAN Notices,
   130      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
   131      * in that: (1) We only maintain dependency links across workers
   132      * upon steals, rather than use per-task bookkeeping.  This may
   133      * require a linear scan of workers array to locate stealers, but
   134      * usually doesn't because stealers leave hints (that may become
   135      * stale/wrong) of where to locate them. This isolates cost to
   136      * when it is needed, rather than adding to per-task overhead.
   137      * (2) It is "shallow", ignoring nesting and potentially cyclic
   138      * mutual steals.  (3) It is intentionally racy: field currentJoin
   139      * is updated only while actively joining, which means that we
   140      * miss links in the chain during long-lived tasks, GC stalls etc
   141      * (which is OK since blocking in such cases is usually a good
   142      * idea).  (4) We bound the number of attempts to find work (see
   143      * MAX_HELP) and fall back to suspending the worker and if
   144      * necessary replacing it with another.
   145      *
   146      * Efficient implementation of these algorithms currently relies
   147      * on an uncomfortable amount of "Unsafe" mechanics. To maintain
   148      * correct orderings, reads and writes of variable queueBase
   149      * require volatile ordering.  Variable queueTop need not be
   150      * volatile because non-local reads always follow those of
   151      * queueBase.  Similarly, because they are protected by volatile
   152      * queueBase reads, reads of the queue array and its slots by
   153      * other threads do not need volatile load semantics, but writes
   154      * (in push) require store order and CASes (in pop and deq)
   155      * require (volatile) CAS semantics.  (Michael, Saraswat, and
   156      * Vechev's algorithm has similar properties, but without support
   157      * for nulling slots.)  Since these combinations aren't supported
   158      * using ordinary volatiles, the only way to accomplish these
   159      * efficiently is to use direct Unsafe calls. (Using external
   160      * AtomicIntegers and AtomicReferenceArrays for the indices and
   161      * array is significantly slower because of memory locality and
   162      * indirection effects.)
   163      *
   164      * Further, performance on most platforms is very sensitive to
   165      * placement and sizing of the (resizable) queue array.  Even
   166      * though these queues don't usually become all that big, the
   167      * initial size must be large enough to counteract cache
   168      * contention effects across multiple queues (especially in the
   169      * presence of GC cardmarking). Also, to improve thread-locality,
   170      * queues are initialized after starting.
   171      */
   172 
   173     /**
   174      * Mask for pool indices encoded as shorts
   175      */
   176     private static final int  SMASK  = 0xffff;
   177 
   178     /**
   179      * Capacity of work-stealing queue array upon initialization.
   180      * Must be a power of two. Initial size must be at least 4, but is
   181      * padded to minimize cache effects.
   182      */
   183     private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
   184 
   185     /**
   186      * Maximum size for queue array. Must be a power of two
   187      * less than or equal to 1 << (31 - width of array entry) to
   188      * ensure lack of index wraparound, but is capped at a lower
   189      * value to help users trap runaway computations.
   190      */
   191     private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
   192 
   193     /**
   194      * The work-stealing queue array. Size must be a power of two.
   195      * Initialized when started (as oposed to when constructed), to
   196      * improve memory locality.
   197      */
   198     ForkJoinTask<?>[] queue;
   199 
   200     /**
   201      * The pool this thread works in. Accessed directly by ForkJoinTask.
   202      */
   203     final ForkJoinPool pool;
   204 
   205     /**
   206      * Index (mod queue.length) of next queue slot to push to or pop
   207      * from. It is written only by owner thread, and accessed by other
   208      * threads only after reading (volatile) queueBase.  Both queueTop
   209      * and queueBase are allowed to wrap around on overflow, but
   210      * (queueTop - queueBase) still estimates size.
   211      */
   212     int queueTop;
   213 
   214     /**
   215      * Index (mod queue.length) of least valid queue slot, which is
   216      * always the next position to steal from if nonempty.
   217      */
   218     volatile int queueBase;
   219 
   220     /**
   221      * The index of most recent stealer, used as a hint to avoid
   222      * traversal in method helpJoinTask. This is only a hint because a
   223      * worker might have had multiple steals and this only holds one
   224      * of them (usually the most current). Declared non-volatile,
   225      * relying on other prevailing sync to keep reasonably current.
   226      */
   227     int stealHint;
   228 
   229     /**
   230      * Index of this worker in pool array. Set once by pool before
   231      * running, and accessed directly by pool to locate this worker in
   232      * its workers array.
   233      */
   234     final int poolIndex;
   235 
   236     /**
   237      * Encoded record for pool task waits. Usages are always
   238      * surrounded by volatile reads/writes
   239      */
   240     int nextWait;
   241 
   242     /**
   243      * Complement of poolIndex, offset by count of entries of task
   244      * waits. Accessed by ForkJoinPool to manage event waiters.
   245      */
   246     volatile int eventCount;
   247 
   248     /**
   249      * Seed for random number generator for choosing steal victims.
   250      * Uses Marsaglia xorshift. Must be initialized as nonzero.
   251      */
   252     int seed;
   253 
   254     /**
   255      * Number of steals. Directly accessed (and reset) by pool when
   256      * idle.
   257      */
   258     int stealCount;
   259 
   260     /**
   261      * True if this worker should or did terminate
   262      */
   263     volatile boolean terminate;
   264 
   265     /**
   266      * Set to true before LockSupport.park; false on return
   267      */
   268     volatile boolean parked;
   269 
   270     /**
   271      * True if use local fifo, not default lifo, for local polling.
   272      * Shadows value from ForkJoinPool.
   273      */
   274     final boolean locallyFifo;
   275 
   276     /**
   277      * The task most recently stolen from another worker (or
   278      * submission queue).  All uses are surrounded by enough volatile
   279      * reads/writes to maintain as non-volatile.
   280      */
   281     ForkJoinTask<?> currentSteal;
   282 
   283     /**
   284      * The task currently being joined, set only when actively trying
   285      * to help other stealers in helpJoinTask. All uses are surrounded
   286      * by enough volatile reads/writes to maintain as non-volatile.
   287      */
   288     ForkJoinTask<?> currentJoin;
   289 
   290     /**
   291      * Creates a ForkJoinWorkerThread operating in the given pool.
   292      *
   293      * @param pool the pool this thread works in
   294      * @throws NullPointerException if pool is null
   295      */
   296     protected ForkJoinWorkerThread(ForkJoinPool pool) {
   297         super(pool.nextWorkerName());
   298         this.pool = pool;
   299         int k = pool.registerWorker(this);
   300         poolIndex = k;
   301         eventCount = ~k & SMASK; // clear wait count
   302         locallyFifo = pool.locallyFifo;
   303         Thread.UncaughtExceptionHandler ueh = pool.ueh;
   304         if (ueh != null)
   305             setUncaughtExceptionHandler(ueh);
   306         setDaemon(true);
   307     }
   308 
   309     // Public methods
   310 
   311     /**
   312      * Returns the pool hosting this thread.
   313      *
   314      * @return the pool
   315      */
   316     public ForkJoinPool getPool() {
   317         return pool;
   318     }
   319 
   320     /**
   321      * Returns the index number of this thread in its pool.  The
   322      * returned value ranges from zero to the maximum number of
   323      * threads (minus one) that have ever been created in the pool.
   324      * This method may be useful for applications that track status or
   325      * collect results per-worker rather than per-task.
   326      *
   327      * @return the index number
   328      */
   329     public int getPoolIndex() {
   330         return poolIndex;
   331     }
   332 
   333     // Randomization
   334 
   335     /**
   336      * Computes next value for random victim probes and backoffs.
   337      * Scans don't require a very high quality generator, but also not
   338      * a crummy one.  Marsaglia xor-shift is cheap and works well
   339      * enough.  Note: This is manually inlined in FJP.scan() to avoid
   340      * writes inside busy loops.
   341      */
   342     private int nextSeed() {
   343         int r = seed;
   344         r ^= r << 13;
   345         r ^= r >>> 17;
   346         r ^= r << 5;
   347         return seed = r;
   348     }
   349 
   350     // Run State management
   351 
   352     /**
   353      * Initializes internal state after construction but before
   354      * processing any tasks. If you override this method, you must
   355      * invoke {@code super.onStart()} at the beginning of the method.
   356      * Initialization requires care: Most fields must have legal
   357      * default values, to ensure that attempted accesses from other
   358      * threads work correctly even before this thread starts
   359      * processing tasks.
   360      */
   361     protected void onStart() {
   362         queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
   363         int r = pool.workerSeedGenerator.nextInt();
   364         seed = (r == 0) ? 1 : r; //  must be nonzero
   365     }
   366 
   367     /**
   368      * Performs cleanup associated with termination of this worker
   369      * thread.  If you override this method, you must invoke
   370      * {@code super.onTermination} at the end of the overridden method.
   371      *
   372      * @param exception the exception causing this thread to abort due
   373      * to an unrecoverable error, or {@code null} if completed normally
   374      */
   375     protected void onTermination(Throwable exception) {
   376         try {
   377             terminate = true;
   378             cancelTasks();
   379             pool.deregisterWorker(this, exception);
   380         } catch (Throwable ex) {        // Shouldn't ever happen
   381             if (exception == null)      // but if so, at least rethrown
   382                 exception = ex;
   383         } finally {
   384             if (exception != null)
   385                 UNSAFE.throwException(exception);
   386         }
   387     }
   388 
   389     /**
   390      * This method is required to be public, but should never be
   391      * called explicitly. It performs the main run loop to execute
   392      * {@link ForkJoinTask}s.
   393      */
   394     public void run() {
   395         Throwable exception = null;
   396         try {
   397             onStart();
   398             pool.work(this);
   399         } catch (Throwable ex) {
   400             exception = ex;
   401         } finally {
   402             onTermination(exception);
   403         }
   404     }
   405 
   406     /*
   407      * Intrinsics-based atomic writes for queue slots. These are
   408      * basically the same as methods in AtomicReferenceArray, but
   409      * specialized for (1) ForkJoinTask elements (2) requirement that
   410      * nullness and bounds checks have already been performed by
   411      * callers and (3) effective offsets are known not to overflow
   412      * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
   413      * need corresponding version for reads: plain array reads are OK
   414      * because they are protected by other volatile reads and are
   415      * confirmed by CASes.
   416      *
   417      * Most uses don't actually call these methods, but instead
   418      * contain inlined forms that enable more predictable
   419      * optimization.  We don't define the version of write used in
   420      * pushTask at all, but instead inline there a store-fenced array
   421      * slot write.
   422      *
   423      * Also in most methods, as a performance (not correctness) issue,
   424      * we'd like to encourage compilers not to arbitrarily postpone
   425      * setting queueTop after writing slot.  Currently there is no
   426      * intrinsic for arranging this, but using Unsafe putOrderedInt
   427      * may be a preferable strategy on some compilers even though its
   428      * main effect is a pre-, not post- fence. To simplify possible
   429      * changes, the option is left in comments next to the associated
   430      * assignments.
   431      */
   432 
   433     /**
   434      * CASes slot i of array q from t to null. Caller must ensure q is
   435      * non-null and index is in range.
   436      */
   437     private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
   438                                              ForkJoinTask<?> t) {
   439         return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
   440     }
   441 
   442     /**
   443      * Performs a volatile write of the given task at given slot of
   444      * array q.  Caller must ensure q is non-null and index is in
   445      * range. This method is used only during resets and backouts.
   446      */
   447     private static final void writeSlot(ForkJoinTask<?>[] q, int i,
   448                                         ForkJoinTask<?> t) {
   449         UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
   450     }
   451 
   452     // queue methods
   453 
   454     /**
   455      * Pushes a task. Call only from this thread.
   456      *
   457      * @param t the task. Caller must ensure non-null.
   458      */
   459     final void pushTask(ForkJoinTask<?> t) {
   460         ForkJoinTask<?>[] q; int s, m;
   461         if ((q = queue) != null) {    // ignore if queue removed
   462             long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
   463             UNSAFE.putOrderedObject(q, u, t);
   464             queueTop = s + 1;         // or use putOrderedInt
   465             if ((s -= queueBase) <= 2)
   466                 pool.signalWork();
   467             else if (s == m)
   468                 growQueue();
   469         }
   470     }
   471 
   472     /**
   473      * Creates or doubles queue array.  Transfers elements by
   474      * emulating steals (deqs) from old array and placing, oldest
   475      * first, into new array.
   476      */
   477     private void growQueue() {
   478         ForkJoinTask<?>[] oldQ = queue;
   479         int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
   480         if (size > MAXIMUM_QUEUE_CAPACITY)
   481             throw new RejectedExecutionException("Queue capacity exceeded");
   482         if (size < INITIAL_QUEUE_CAPACITY)
   483             size = INITIAL_QUEUE_CAPACITY;
   484         ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
   485         int mask = size - 1;
   486         int top = queueTop;
   487         int oldMask;
   488         if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
   489             for (int b = queueBase; b != top; ++b) {
   490                 long u = ((b & oldMask) << ASHIFT) + ABASE;
   491                 Object x = UNSAFE.getObjectVolatile(oldQ, u);
   492                 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
   493                     UNSAFE.putObjectVolatile
   494                         (q, ((b & mask) << ASHIFT) + ABASE, x);
   495             }
   496         }
   497     }
   498 
   499     /**
   500      * Tries to take a task from the base of the queue, failing if
   501      * empty or contended. Note: Specializations of this code appear
   502      * in locallyDeqTask and elsewhere.
   503      *
   504      * @return a task, or null if none or contended
   505      */
   506     final ForkJoinTask<?> deqTask() {
   507         ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
   508         if (queueTop != (b = queueBase) &&
   509             (q = queue) != null && // must read q after b
   510             (i = (q.length - 1) & b) >= 0 &&
   511             (t = q[i]) != null && queueBase == b &&
   512             UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
   513             queueBase = b + 1;
   514             return t;
   515         }
   516         return null;
   517     }
   518 
   519     /**
   520      * Tries to take a task from the base of own queue.  Called only
   521      * by this thread.
   522      *
   523      * @return a task, or null if none
   524      */
   525     final ForkJoinTask<?> locallyDeqTask() {
   526         ForkJoinTask<?> t; int m, b, i;
   527         ForkJoinTask<?>[] q = queue;
   528         if (q != null && (m = q.length - 1) >= 0) {
   529             while (queueTop != (b = queueBase)) {
   530                 if ((t = q[i = m & b]) != null &&
   531                     queueBase == b &&
   532                     UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
   533                                                 t, null)) {
   534                     queueBase = b + 1;
   535                     return t;
   536                 }
   537             }
   538         }
   539         return null;
   540     }
   541 
   542     /**
   543      * Returns a popped task, or null if empty.
   544      * Called only by this thread.
   545      */
   546     private ForkJoinTask<?> popTask() {
   547         int m;
   548         ForkJoinTask<?>[] q = queue;
   549         if (q != null && (m = q.length - 1) >= 0) {
   550             for (int s; (s = queueTop) != queueBase;) {
   551                 int i = m & --s;
   552                 long u = (i << ASHIFT) + ABASE; // raw offset
   553                 ForkJoinTask<?> t = q[i];
   554                 if (t == null)   // lost to stealer
   555                     break;
   556                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
   557                     queueTop = s; // or putOrderedInt
   558                     return t;
   559                 }
   560             }
   561         }
   562         return null;
   563     }
   564 
   565     /**
   566      * Specialized version of popTask to pop only if topmost element
   567      * is the given task. Called only by this thread.
   568      *
   569      * @param t the task. Caller must ensure non-null.
   570      */
   571     final boolean unpushTask(ForkJoinTask<?> t) {
   572         ForkJoinTask<?>[] q;
   573         int s;
   574         if ((q = queue) != null && (s = queueTop) != queueBase &&
   575             UNSAFE.compareAndSwapObject
   576             (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
   577             queueTop = s; // or putOrderedInt
   578             return true;
   579         }
   580         return false;
   581     }
   582 
   583     /**
   584      * Returns next task, or null if empty or contended.
   585      */
   586     final ForkJoinTask<?> peekTask() {
   587         int m;
   588         ForkJoinTask<?>[] q = queue;
   589         if (q == null || (m = q.length - 1) < 0)
   590             return null;
   591         int i = locallyFifo ? queueBase : (queueTop - 1);
   592         return q[i & m];
   593     }
   594 
   595     // Support methods for ForkJoinPool
   596 
   597     /**
   598      * Runs the given task, plus any local tasks until queue is empty
   599      */
   600     final void execTask(ForkJoinTask<?> t) {
   601         currentSteal = t;
   602         for (;;) {
   603             if (t != null)
   604                 t.doExec();
   605             if (queueTop == queueBase)
   606                 break;
   607             t = locallyFifo ? locallyDeqTask() : popTask();
   608         }
   609         ++stealCount;
   610         currentSteal = null;
   611     }
   612 
   613     /**
   614      * Removes and cancels all tasks in queue.  Can be called from any
   615      * thread.
   616      */
   617     final void cancelTasks() {
   618         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
   619         if (cj != null && cj.status >= 0)
   620             cj.cancelIgnoringExceptions();
   621         ForkJoinTask<?> cs = currentSteal;
   622         if (cs != null && cs.status >= 0)
   623             cs.cancelIgnoringExceptions();
   624         while (queueBase != queueTop) {
   625             ForkJoinTask<?> t = deqTask();
   626             if (t != null)
   627                 t.cancelIgnoringExceptions();
   628         }
   629     }
   630 
   631     /**
   632      * Drains tasks to given collection c.
   633      *
   634      * @return the number of tasks drained
   635      */
   636     final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
   637         int n = 0;
   638         while (queueBase != queueTop) {
   639             ForkJoinTask<?> t = deqTask();
   640             if (t != null) {
   641                 c.add(t);
   642                 ++n;
   643             }
   644         }
   645         return n;
   646     }
   647 
   648     // Support methods for ForkJoinTask
   649 
   650     /**
   651      * Returns an estimate of the number of tasks in the queue.
   652      */
   653     final int getQueueSize() {
   654         return queueTop - queueBase;
   655     }
   656 
   657     /**
   658      * Gets and removes a local task.
   659      *
   660      * @return a task, if available
   661      */
   662     final ForkJoinTask<?> pollLocalTask() {
   663         return locallyFifo ? locallyDeqTask() : popTask();
   664     }
   665 
   666     /**
   667      * Gets and removes a local or stolen task.
   668      *
   669      * @return a task, if available
   670      */
   671     final ForkJoinTask<?> pollTask() {
   672         ForkJoinWorkerThread[] ws;
   673         ForkJoinTask<?> t = pollLocalTask();
   674         if (t != null || (ws = pool.workers) == null)
   675             return t;
   676         int n = ws.length; // cheap version of FJP.scan
   677         int steps = n << 1;
   678         int r = nextSeed();
   679         int i = 0;
   680         while (i < steps) {
   681             ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
   682             if (w != null && w.queueBase != w.queueTop && w.queue != null) {
   683                 if ((t = w.deqTask()) != null)
   684                     return t;
   685                 i = 0;
   686             }
   687         }
   688         return null;
   689     }
   690 
   691     /**
   692      * The maximum stolen->joining link depth allowed in helpJoinTask,
   693      * as well as the maximum number of retries (allowing on average
   694      * one staleness retry per level) per attempt to instead try
   695      * compensation.  Depths for legitimate chains are unbounded, but
   696      * we use a fixed constant to avoid (otherwise unchecked) cycles
   697      * and bound staleness of traversal parameters at the expense of
   698      * sometimes blocking when we could be helping.
   699      */
   700     private static final int MAX_HELP = 16;
   701 
   702     /**
   703      * Possibly runs some tasks and/or blocks, until joinMe is done.
   704      *
   705      * @param joinMe the task to join
   706      * @return completion status on exit
   707      */
   708     final int joinTask(ForkJoinTask<?> joinMe) {
   709         ForkJoinTask<?> prevJoin = currentJoin;
   710         currentJoin = joinMe;
   711         for (int s, retries = MAX_HELP;;) {
   712             if ((s = joinMe.status) < 0) {
   713                 currentJoin = prevJoin;
   714                 return s;
   715             }
   716             if (retries > 0) {
   717                 if (queueTop != queueBase) {
   718                     if (!localHelpJoinTask(joinMe))
   719                         retries = 0;           // cannot help
   720                 }
   721                 else if (retries == MAX_HELP >>> 1) {
   722                     --retries;                 // check uncommon case
   723                     if (tryDeqAndExec(joinMe) >= 0)
   724                         Thread.yield();        // for politeness
   725                 }
   726                 else
   727                     retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
   728             }
   729             else {
   730                 retries = MAX_HELP;           // restart if not done
   731                 pool.tryAwaitJoin(joinMe);
   732             }
   733         }
   734     }
   735 
   736     /**
   737      * If present, pops and executes the given task, or any other
   738      * cancelled task
   739      *
   740      * @return false if any other non-cancelled task exists in local queue
   741      */
   742     private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
   743         int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
   744         if ((s = queueTop) != queueBase && (q = queue) != null &&
   745             (i = (q.length - 1) & --s) >= 0 &&
   746             (t = q[i]) != null) {
   747             if (t != joinMe && t.status >= 0)
   748                 return false;
   749             if (UNSAFE.compareAndSwapObject
   750                 (q, (i << ASHIFT) + ABASE, t, null)) {
   751                 queueTop = s;           // or putOrderedInt
   752                 t.doExec();
   753             }
   754         }
   755         return true;
   756     }
   757 
   758     /**
   759      * Tries to locate and execute tasks for a stealer of the given
   760      * task, or in turn one of its stealers, Traces
   761      * currentSteal->currentJoin links looking for a thread working on
   762      * a descendant of the given task and with a non-empty queue to
   763      * steal back and execute tasks from.  The implementation is very
   764      * branchy to cope with potential inconsistencies or loops
   765      * encountering chains that are stale, unknown, or of length
   766      * greater than MAX_HELP links.  All of these cases are dealt with
   767      * by just retrying by caller.
   768      *
   769      * @param joinMe the task to join
   770      * @param canSteal true if local queue is empty
   771      * @return true if ran a task
   772      */
   773     private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
   774         boolean helped = false;
   775         int m = pool.scanGuard & SMASK;
   776         ForkJoinWorkerThread[] ws = pool.workers;
   777         if (ws != null && ws.length > m && joinMe.status >= 0) {
   778             int levels = MAX_HELP;              // remaining chain length
   779             ForkJoinTask<?> task = joinMe;      // base of chain
   780             outer:for (ForkJoinWorkerThread thread = this;;) {
   781                 // Try to find v, the stealer of task, by first using hint
   782                 ForkJoinWorkerThread v = ws[thread.stealHint & m];
   783                 if (v == null || v.currentSteal != task) {
   784                     for (int j = 0; ;) {        // search array
   785                         if ((v = ws[j]) != null && v.currentSteal == task) {
   786                             thread.stealHint = j;
   787                             break;              // save hint for next time
   788                         }
   789                         if (++j > m)
   790                             break outer;        // can't find stealer
   791                     }
   792                 }
   793                 // Try to help v, using specialized form of deqTask
   794                 for (;;) {
   795                     ForkJoinTask<?>[] q; int b, i;
   796                     if (joinMe.status < 0)
   797                         break outer;
   798                     if ((b = v.queueBase) == v.queueTop ||
   799                         (q = v.queue) == null ||
   800                         (i = (q.length-1) & b) < 0)
   801                         break;                  // empty
   802                     long u = (i << ASHIFT) + ABASE;
   803                     ForkJoinTask<?> t = q[i];
   804                     if (task.status < 0)
   805                         break outer;            // stale
   806                     if (t != null && v.queueBase == b &&
   807                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
   808                         v.queueBase = b + 1;
   809                         v.stealHint = poolIndex;
   810                         ForkJoinTask<?> ps = currentSteal;
   811                         currentSteal = t;
   812                         t.doExec();
   813                         currentSteal = ps;
   814                         helped = true;
   815                     }
   816                 }
   817                 // Try to descend to find v's stealer
   818                 ForkJoinTask<?> next = v.currentJoin;
   819                 if (--levels > 0 && task.status >= 0 &&
   820                     next != null && next != task) {
   821                     task = next;
   822                     thread = v;
   823                 }
   824                 else
   825                     break;  // max levels, stale, dead-end, or cyclic
   826             }
   827         }
   828         return helped;
   829     }
   830 
   831     /**
   832      * Performs an uncommon case for joinTask: If task t is at base of
   833      * some workers queue, steals and executes it.
   834      *
   835      * @param t the task
   836      * @return t's status
   837      */
   838     private int tryDeqAndExec(ForkJoinTask<?> t) {
   839         int m = pool.scanGuard & SMASK;
   840         ForkJoinWorkerThread[] ws = pool.workers;
   841         if (ws != null && ws.length > m && t.status >= 0) {
   842             for (int j = 0; j <= m; ++j) {
   843                 ForkJoinTask<?>[] q; int b, i;
   844                 ForkJoinWorkerThread v = ws[j];
   845                 if (v != null &&
   846                     (b = v.queueBase) != v.queueTop &&
   847                     (q = v.queue) != null &&
   848                     (i = (q.length - 1) & b) >= 0 &&
   849                     q[i] ==  t) {
   850                     long u = (i << ASHIFT) + ABASE;
   851                     if (v.queueBase == b &&
   852                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
   853                         v.queueBase = b + 1;
   854                         v.stealHint = poolIndex;
   855                         ForkJoinTask<?> ps = currentSteal;
   856                         currentSteal = t;
   857                         t.doExec();
   858                         currentSteal = ps;
   859                     }
   860                     break;
   861                 }
   862             }
   863         }
   864         return t.status;
   865     }
   866 
   867     /**
   868      * Implements ForkJoinTask.getSurplusQueuedTaskCount().  Returns
   869      * an estimate of the number of tasks, offset by a function of
   870      * number of idle workers.
   871      *
   872      * This method provides a cheap heuristic guide for task
   873      * partitioning when programmers, frameworks, tools, or languages
   874      * have little or no idea about task granularity.  In essence by
   875      * offering this method, we ask users only about tradeoffs in
   876      * overhead vs expected throughput and its variance, rather than
   877      * how finely to partition tasks.
   878      *
   879      * In a steady state strict (tree-structured) computation, each
   880      * thread makes available for stealing enough tasks for other
   881      * threads to remain active. Inductively, if all threads play by
   882      * the same rules, each thread should make available only a
   883      * constant number of tasks.
   884      *
   885      * The minimum useful constant is just 1. But using a value of 1
   886      * would require immediate replenishment upon each steal to
   887      * maintain enough tasks, which is infeasible.  Further,
   888      * partitionings/granularities of offered tasks should minimize
   889      * steal rates, which in general means that threads nearer the top
   890      * of computation tree should generate more than those nearer the
   891      * bottom. In perfect steady state, each thread is at
   892      * approximately the same level of computation tree. However,
   893      * producing extra tasks amortizes the uncertainty of progress and
   894      * diffusion assumptions.
   895      *
   896      * So, users will want to use values larger, but not much larger
   897      * than 1 to both smooth over transient shortages and hedge
   898      * against uneven progress; as traded off against the cost of
   899      * extra task overhead. We leave the user to pick a threshold
   900      * value to compare with the results of this call to guide
   901      * decisions, but recommend values such as 3.
   902      *
   903      * When all threads are active, it is on average OK to estimate
   904      * surplus strictly locally. In steady-state, if one thread is
   905      * maintaining say 2 surplus tasks, then so are others. So we can
   906      * just use estimated queue length (although note that (queueTop -
   907      * queueBase) can be an overestimate because of stealers lagging
   908      * increments of queueBase).  However, this strategy alone leads
   909      * to serious mis-estimates in some non-steady-state conditions
   910      * (ramp-up, ramp-down, other stalls). We can detect many of these
   911      * by further considering the number of "idle" threads, that are
   912      * known to have zero queued tasks, so compensate by a factor of
   913      * (#idle/#active) threads.
   914      */
   915     final int getEstimatedSurplusTaskCount() {
   916         return queueTop - queueBase - pool.idlePerActive();
   917     }
   918 
   919     /**
   920      * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
   921      * pool's active count ctl maintenance, but rather than blocking
   922      * when tasks cannot be found, we rescan until all others cannot
   923      * find tasks either. The bracketing by pool quiescerCounts
   924      * updates suppresses pool auto-shutdown mechanics that could
   925      * otherwise prematurely terminate the pool because all threads
   926      * appear to be inactive.
   927      */
   928     final void helpQuiescePool() {
   929         boolean active = true;
   930         ForkJoinTask<?> ps = currentSteal; // to restore below
   931         ForkJoinPool p = pool;
   932         p.addQuiescerCount(1);
   933         for (;;) {
   934             ForkJoinWorkerThread[] ws = p.workers;
   935             ForkJoinWorkerThread v = null;
   936             int n;
   937             if (queueTop != queueBase)
   938                 v = this;
   939             else if (ws != null && (n = ws.length) > 1) {
   940                 ForkJoinWorkerThread w;
   941                 int r = nextSeed(); // cheap version of FJP.scan
   942                 int steps = n << 1;
   943                 for (int i = 0; i < steps; ++i) {
   944                     if ((w = ws[(i + r) & (n - 1)]) != null &&
   945                         w.queueBase != w.queueTop) {
   946                         v = w;
   947                         break;
   948                     }
   949                 }
   950             }
   951             if (v != null) {
   952                 ForkJoinTask<?> t;
   953                 if (!active) {
   954                     active = true;
   955                     p.addActiveCount(1);
   956                 }
   957                 if ((t = (v != this) ? v.deqTask() :
   958                      locallyFifo ? locallyDeqTask() : popTask()) != null) {
   959                     currentSteal = t;
   960                     t.doExec();
   961                     currentSteal = ps;
   962                 }
   963             }
   964             else {
   965                 if (active) {
   966                     active = false;
   967                     p.addActiveCount(-1);
   968                 }
   969                 if (p.isQuiescent()) {
   970                     p.addActiveCount(1);
   971                     p.addQuiescerCount(-1);
   972                     break;
   973                 }
   974             }
   975         }
   976     }
   977 
   978     // Unsafe mechanics
   979     private static final Unsafe UNSAFE;
   980     private static final long ABASE;
   981     private static final int ASHIFT;
   982 
   983     static {
   984         int s;
   985         try {
   986             UNSAFE = Unsafe.getUnsafe();
   987             Class a = ForkJoinTask[].class;
   988             ABASE = UNSAFE.arrayBaseOffset(a);
   989             s = UNSAFE.arrayIndexScale(a);
   990         } catch (Exception e) {
   991             throw new Error(e);
   992         }
   993         if ((s & (s-1)) != 0)
   994             throw new Error("data type scale not a power of two");
   995         ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
   996     }
   997 
   998 }