rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 13:15:11 +0100
changeset 1896 9984d9a62bc0
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent compilable without references to sun.misc.Unsafe and co.
jaroslav@1890
     1
/*
jaroslav@1890
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
jaroslav@1890
     3
 *
jaroslav@1890
     4
 * This code is free software; you can redistribute it and/or modify it
jaroslav@1890
     5
 * under the terms of the GNU General Public License version 2 only, as
jaroslav@1890
     6
 * published by the Free Software Foundation.  Oracle designates this
jaroslav@1890
     7
 * particular file as subject to the "Classpath" exception as provided
jaroslav@1890
     8
 * by Oracle in the LICENSE file that accompanied this code.
jaroslav@1890
     9
 *
jaroslav@1890
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
jaroslav@1890
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
jaroslav@1890
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
jaroslav@1890
    13
 * version 2 for more details (a copy is included in the LICENSE file that
jaroslav@1890
    14
 * accompanied this code).
jaroslav@1890
    15
 *
jaroslav@1890
    16
 * You should have received a copy of the GNU General Public License version
jaroslav@1890
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
jaroslav@1890
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
jaroslav@1890
    19
 *
jaroslav@1890
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
jaroslav@1890
    21
 * or visit www.oracle.com if you need additional information or have any
jaroslav@1890
    22
 * questions.
jaroslav@1890
    23
 */
jaroslav@1890
    24
jaroslav@1890
    25
/*
jaroslav@1890
    26
 * This file is available under and governed by the GNU General Public
jaroslav@1890
    27
 * License version 2 only, as published by the Free Software Foundation.
jaroslav@1890
    28
 * However, the following notice accompanied the original version of this
jaroslav@1890
    29
 * file:
jaroslav@1890
    30
 *
jaroslav@1890
    31
 * Written by Doug Lea with assistance from members of JCP JSR-166
jaroslav@1890
    32
 * Expert Group and released to the public domain, as explained at
jaroslav@1890
    33
 * http://creativecommons.org/publicdomain/zero/1.0/
jaroslav@1890
    34
 */
jaroslav@1890
    35
jaroslav@1890
    36
package java.util.concurrent;
jaroslav@1890
    37
jaroslav@1890
    38
import java.util.Collection;
jaroslav@1890
    39
import java.util.concurrent.RejectedExecutionException;
jaroslav@1890
    40
jaroslav@1890
    41
/**
jaroslav@1890
    42
 * A thread managed by a {@link ForkJoinPool}, which executes
jaroslav@1890
    43
 * {@link ForkJoinTask}s.
jaroslav@1890
    44
 * This class is subclassable solely for the sake of adding
jaroslav@1890
    45
 * functionality -- there are no overridable methods dealing with
jaroslav@1890
    46
 * scheduling or execution.  However, you can override initialization
jaroslav@1890
    47
 * and termination methods surrounding the main task processing loop.
jaroslav@1890
    48
 * If you do create such a subclass, you will also need to supply a
jaroslav@1890
    49
 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
jaroslav@1890
    50
 * in a {@code ForkJoinPool}.
jaroslav@1890
    51
 *
jaroslav@1890
    52
 * @since 1.7
jaroslav@1890
    53
 * @author Doug Lea
jaroslav@1890
    54
 */
jaroslav@1890
    55
public class ForkJoinWorkerThread extends Thread {
jaroslav@1890
    56
    /*
jaroslav@1890
    57
     * Overview:
jaroslav@1890
    58
     *
jaroslav@1890
    59
     * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
jaroslav@1890
    60
     * ForkJoinTasks. This class includes bookkeeping in support of
jaroslav@1890
    61
     * worker activation, suspension, and lifecycle control described
jaroslav@1890
    62
     * in more detail in the internal documentation of class
jaroslav@1890
    63
     * ForkJoinPool. And as described further below, this class also
jaroslav@1890
    64
     * includes special-cased support for some ForkJoinTask
jaroslav@1890
    65
     * methods. But the main mechanics involve work-stealing:
jaroslav@1890
    66
     *
jaroslav@1890
    67
     * Work-stealing queues are special forms of Deques that support
jaroslav@1890
    68
     * only three of the four possible end-operations -- push, pop,
jaroslav@1890
    69
     * and deq (aka steal), under the further constraints that push
jaroslav@1890
    70
     * and pop are called only from the owning thread, while deq may
jaroslav@1890
    71
     * be called from other threads.  (If you are unfamiliar with
jaroslav@1890
    72
     * them, you probably want to read Herlihy and Shavit's book "The
jaroslav@1890
    73
     * Art of Multiprocessor programming", chapter 16 describing these
jaroslav@1890
    74
     * in more detail before proceeding.)  The main work-stealing
jaroslav@1890
    75
     * queue design is roughly similar to those in the papers "Dynamic
jaroslav@1890
    76
     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
jaroslav@1890
    77
     * (http://research.sun.com/scalable/pubs/index.html) and
jaroslav@1890
    78
     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
jaroslav@1890
    79
     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
jaroslav@1890
    80
     * The main differences ultimately stem from gc requirements that
jaroslav@1890
    81
     * we null out taken slots as soon as we can, to maintain as small
jaroslav@1890
    82
     * a footprint as possible even in programs generating huge
jaroslav@1890
    83
     * numbers of tasks. To accomplish this, we shift the CAS
jaroslav@1890
    84
     * arbitrating pop vs deq (steal) from being on the indices
jaroslav@1890
    85
     * ("queueBase" and "queueTop") to the slots themselves (mainly
jaroslav@1890
    86
     * via method "casSlotNull()"). So, both a successful pop and deq
jaroslav@1890
    87
     * mainly entail a CAS of a slot from non-null to null.  Because
jaroslav@1890
    88
     * we rely on CASes of references, we do not need tag bits on
jaroslav@1890
    89
     * queueBase or queueTop.  They are simple ints as used in any
jaroslav@1890
    90
     * circular array-based queue (see for example ArrayDeque).
jaroslav@1890
    91
     * Updates to the indices must still be ordered in a way that
jaroslav@1890
    92
     * guarantees that queueTop == queueBase means the queue is empty,
jaroslav@1890
    93
     * but otherwise may err on the side of possibly making the queue
jaroslav@1890
    94
     * appear nonempty when a push, pop, or deq have not fully
jaroslav@1890
    95
     * committed. Note that this means that the deq operation,
jaroslav@1890
    96
     * considered individually, is not wait-free. One thief cannot
jaroslav@1890
    97
     * successfully continue until another in-progress one (or, if
jaroslav@1890
    98
     * previously empty, a push) completes.  However, in the
jaroslav@1890
    99
     * aggregate, we ensure at least probabilistic non-blockingness.
jaroslav@1890
   100
     * If an attempted steal fails, a thief always chooses a different
jaroslav@1890
   101
     * random victim target to try next. So, in order for one thief to
jaroslav@1890
   102
     * progress, it suffices for any in-progress deq or new push on
jaroslav@1890
   103
     * any empty queue to complete.
jaroslav@1890
   104
     *
jaroslav@1890
   105
     * This approach also enables support for "async mode" where local
jaroslav@1890
   106
     * task processing is in FIFO, not LIFO order; simply by using a
jaroslav@1890
   107
     * version of deq rather than pop when locallyFifo is true (as set
jaroslav@1890
   108
     * by the ForkJoinPool).  This allows use in message-passing
jaroslav@1890
   109
     * frameworks in which tasks are never joined.  However neither
jaroslav@1890
   110
     * mode considers affinities, loads, cache localities, etc, so
jaroslav@1890
   111
     * rarely provide the best possible performance on a given
jaroslav@1890
   112
     * machine, but portably provide good throughput by averaging over
jaroslav@1890
   113
     * these factors.  (Further, even if we did try to use such
jaroslav@1890
   114
     * information, we do not usually have a basis for exploiting
jaroslav@1890
   115
     * it. For example, some sets of tasks profit from cache
jaroslav@1890
   116
     * affinities, but others are harmed by cache pollution effects.)
jaroslav@1890
   117
     *
jaroslav@1890
   118
     * When a worker would otherwise be blocked waiting to join a
jaroslav@1890
   119
     * task, it first tries a form of linear helping: Each worker
jaroslav@1890
   120
     * records (in field currentSteal) the most recent task it stole
jaroslav@1890
   121
     * from some other worker. Plus, it records (in field currentJoin)
jaroslav@1890
   122
     * the task it is currently actively joining. Method joinTask uses
jaroslav@1890
   123
     * these markers to try to find a worker to help (i.e., steal back
jaroslav@1890
   124
     * a task from and execute it) that could hasten completion of the
jaroslav@1890
   125
     * actively joined task. In essence, the joiner executes a task
jaroslav@1890
   126
     * that would be on its own local deque had the to-be-joined task
jaroslav@1890
   127
     * not been stolen. This may be seen as a conservative variant of
jaroslav@1890
   128
     * the approach in Wagner & Calder "Leapfrogging: a portable
jaroslav@1890
   129
     * technique for implementing efficient futures" SIGPLAN Notices,
jaroslav@1890
   130
     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
jaroslav@1890
   131
     * in that: (1) We only maintain dependency links across workers
jaroslav@1890
   132
     * upon steals, rather than use per-task bookkeeping.  This may
jaroslav@1890
   133
     * require a linear scan of workers array to locate stealers, but
jaroslav@1890
   134
     * usually doesn't because stealers leave hints (that may become
jaroslav@1890
   135
     * stale/wrong) of where to locate them. This isolates cost to
jaroslav@1890
   136
     * when it is needed, rather than adding to per-task overhead.
jaroslav@1890
   137
     * (2) It is "shallow", ignoring nesting and potentially cyclic
jaroslav@1890
   138
     * mutual steals.  (3) It is intentionally racy: field currentJoin
jaroslav@1890
   139
     * is updated only while actively joining, which means that we
jaroslav@1890
   140
     * miss links in the chain during long-lived tasks, GC stalls etc
jaroslav@1890
   141
     * (which is OK since blocking in such cases is usually a good
jaroslav@1890
   142
     * idea).  (4) We bound the number of attempts to find work (see
jaroslav@1890
   143
     * MAX_HELP) and fall back to suspending the worker and if
jaroslav@1890
   144
     * necessary replacing it with another.
jaroslav@1890
   145
     *
jaroslav@1890
   146
     * Efficient implementation of these algorithms currently relies
jaroslav@1890
   147
     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
jaroslav@1890
   148
     * correct orderings, reads and writes of variable queueBase
jaroslav@1890
   149
     * require volatile ordering.  Variable queueTop need not be
jaroslav@1890
   150
     * volatile because non-local reads always follow those of
jaroslav@1890
   151
     * queueBase.  Similarly, because they are protected by volatile
jaroslav@1890
   152
     * queueBase reads, reads of the queue array and its slots by
jaroslav@1890
   153
     * other threads do not need volatile load semantics, but writes
jaroslav@1890
   154
     * (in push) require store order and CASes (in pop and deq)
jaroslav@1890
   155
     * require (volatile) CAS semantics.  (Michael, Saraswat, and
jaroslav@1890
   156
     * Vechev's algorithm has similar properties, but without support
jaroslav@1890
   157
     * for nulling slots.)  Since these combinations aren't supported
jaroslav@1890
   158
     * using ordinary volatiles, the only way to accomplish these
jaroslav@1890
   159
     * efficiently is to use direct Unsafe calls. (Using external
jaroslav@1890
   160
     * AtomicIntegers and AtomicReferenceArrays for the indices and
jaroslav@1890
   161
     * array is significantly slower because of memory locality and
jaroslav@1890
   162
     * indirection effects.)
jaroslav@1890
   163
     *
jaroslav@1890
   164
     * Further, performance on most platforms is very sensitive to
jaroslav@1890
   165
     * placement and sizing of the (resizable) queue array.  Even
jaroslav@1890
   166
     * though these queues don't usually become all that big, the
jaroslav@1890
   167
     * initial size must be large enough to counteract cache
jaroslav@1890
   168
     * contention effects across multiple queues (especially in the
jaroslav@1890
   169
     * presence of GC cardmarking). Also, to improve thread-locality,
jaroslav@1890
   170
     * queues are initialized after starting.
jaroslav@1890
   171
     */
jaroslav@1890
   172
jaroslav@1890
   173
    /**
jaroslav@1890
   174
     * Mask for pool indices encoded as shorts
jaroslav@1890
   175
     */
jaroslav@1890
   176
    private static final int  SMASK  = 0xffff;
jaroslav@1890
   177
jaroslav@1890
   178
    /**
jaroslav@1890
   179
     * Capacity of work-stealing queue array upon initialization.
jaroslav@1890
   180
     * Must be a power of two. Initial size must be at least 4, but is
jaroslav@1890
   181
     * padded to minimize cache effects.
jaroslav@1890
   182
     */
jaroslav@1890
   183
    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
jaroslav@1890
   184
jaroslav@1890
   185
    /**
jaroslav@1890
   186
     * Maximum size for queue array. Must be a power of two
jaroslav@1890
   187
     * less than or equal to 1 << (31 - width of array entry) to
jaroslav@1890
   188
     * ensure lack of index wraparound, but is capped at a lower
jaroslav@1890
   189
     * value to help users trap runaway computations.
jaroslav@1890
   190
     */
jaroslav@1890
   191
    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
jaroslav@1890
   192
jaroslav@1890
   193
    /**
jaroslav@1890
   194
     * The work-stealing queue array. Size must be a power of two.
jaroslav@1890
   195
     * Initialized when started (as oposed to when constructed), to
jaroslav@1890
   196
     * improve memory locality.
jaroslav@1890
   197
     */
jaroslav@1890
   198
    ForkJoinTask<?>[] queue;
jaroslav@1890
   199
jaroslav@1890
   200
    /**
jaroslav@1890
   201
     * The pool this thread works in. Accessed directly by ForkJoinTask.
jaroslav@1890
   202
     */
jaroslav@1890
   203
    final ForkJoinPool pool;
jaroslav@1890
   204
jaroslav@1890
   205
    /**
jaroslav@1890
   206
     * Index (mod queue.length) of next queue slot to push to or pop
jaroslav@1890
   207
     * from. It is written only by owner thread, and accessed by other
jaroslav@1890
   208
     * threads only after reading (volatile) queueBase.  Both queueTop
jaroslav@1890
   209
     * and queueBase are allowed to wrap around on overflow, but
jaroslav@1890
   210
     * (queueTop - queueBase) still estimates size.
jaroslav@1890
   211
     */
jaroslav@1890
   212
    int queueTop;
jaroslav@1890
   213
jaroslav@1890
   214
    /**
jaroslav@1890
   215
     * Index (mod queue.length) of least valid queue slot, which is
jaroslav@1890
   216
     * always the next position to steal from if nonempty.
jaroslav@1890
   217
     */
jaroslav@1890
   218
    volatile int queueBase;
jaroslav@1890
   219
jaroslav@1890
   220
    /**
jaroslav@1890
   221
     * The index of most recent stealer, used as a hint to avoid
jaroslav@1890
   222
     * traversal in method helpJoinTask. This is only a hint because a
jaroslav@1890
   223
     * worker might have had multiple steals and this only holds one
jaroslav@1890
   224
     * of them (usually the most current). Declared non-volatile,
jaroslav@1890
   225
     * relying on other prevailing sync to keep reasonably current.
jaroslav@1890
   226
     */
jaroslav@1890
   227
    int stealHint;
jaroslav@1890
   228
jaroslav@1890
   229
    /**
jaroslav@1890
   230
     * Index of this worker in pool array. Set once by pool before
jaroslav@1890
   231
     * running, and accessed directly by pool to locate this worker in
jaroslav@1890
   232
     * its workers array.
jaroslav@1890
   233
     */
jaroslav@1890
   234
    final int poolIndex;
jaroslav@1890
   235
jaroslav@1890
   236
    /**
jaroslav@1890
   237
     * Encoded record for pool task waits. Usages are always
jaroslav@1890
   238
     * surrounded by volatile reads/writes
jaroslav@1890
   239
     */
jaroslav@1890
   240
    int nextWait;
jaroslav@1890
   241
jaroslav@1890
   242
    /**
jaroslav@1890
   243
     * Complement of poolIndex, offset by count of entries of task
jaroslav@1890
   244
     * waits. Accessed by ForkJoinPool to manage event waiters.
jaroslav@1890
   245
     */
jaroslav@1890
   246
    volatile int eventCount;
jaroslav@1890
   247
jaroslav@1890
   248
    /**
jaroslav@1890
   249
     * Seed for random number generator for choosing steal victims.
jaroslav@1890
   250
     * Uses Marsaglia xorshift. Must be initialized as nonzero.
jaroslav@1890
   251
     */
jaroslav@1890
   252
    int seed;
jaroslav@1890
   253
jaroslav@1890
   254
    /**
jaroslav@1890
   255
     * Number of steals. Directly accessed (and reset) by pool when
jaroslav@1890
   256
     * idle.
jaroslav@1890
   257
     */
jaroslav@1890
   258
    int stealCount;
jaroslav@1890
   259
jaroslav@1890
   260
    /**
jaroslav@1890
   261
     * True if this worker should or did terminate
jaroslav@1890
   262
     */
jaroslav@1890
   263
    volatile boolean terminate;
jaroslav@1890
   264
jaroslav@1890
   265
    /**
jaroslav@1890
   266
     * Set to true before LockSupport.park; false on return
jaroslav@1890
   267
     */
jaroslav@1890
   268
    volatile boolean parked;
jaroslav@1890
   269
jaroslav@1890
   270
    /**
jaroslav@1890
   271
     * True if use local fifo, not default lifo, for local polling.
jaroslav@1890
   272
     * Shadows value from ForkJoinPool.
jaroslav@1890
   273
     */
jaroslav@1890
   274
    final boolean locallyFifo;
jaroslav@1890
   275
jaroslav@1890
   276
    /**
jaroslav@1890
   277
     * The task most recently stolen from another worker (or
jaroslav@1890
   278
     * submission queue).  All uses are surrounded by enough volatile
jaroslav@1890
   279
     * reads/writes to maintain as non-volatile.
jaroslav@1890
   280
     */
jaroslav@1890
   281
    ForkJoinTask<?> currentSteal;
jaroslav@1890
   282
jaroslav@1890
   283
    /**
jaroslav@1890
   284
     * The task currently being joined, set only when actively trying
jaroslav@1890
   285
     * to help other stealers in helpJoinTask. All uses are surrounded
jaroslav@1890
   286
     * by enough volatile reads/writes to maintain as non-volatile.
jaroslav@1890
   287
     */
jaroslav@1890
   288
    ForkJoinTask<?> currentJoin;
jaroslav@1890
   289
jaroslav@1890
   290
    /**
jaroslav@1890
   291
     * Creates a ForkJoinWorkerThread operating in the given pool.
jaroslav@1890
   292
     *
jaroslav@1890
   293
     * @param pool the pool this thread works in
jaroslav@1890
   294
     * @throws NullPointerException if pool is null
jaroslav@1890
   295
     */
jaroslav@1890
   296
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
jaroslav@1890
   297
        super(pool.nextWorkerName());
jaroslav@1890
   298
        this.pool = pool;
jaroslav@1890
   299
        int k = pool.registerWorker(this);
jaroslav@1890
   300
        poolIndex = k;
jaroslav@1890
   301
        eventCount = ~k & SMASK; // clear wait count
jaroslav@1890
   302
        locallyFifo = pool.locallyFifo;
jaroslav@1890
   303
        Thread.UncaughtExceptionHandler ueh = pool.ueh;
jaroslav@1890
   304
        if (ueh != null)
jaroslav@1890
   305
            setUncaughtExceptionHandler(ueh);
jaroslav@1890
   306
        setDaemon(true);
jaroslav@1890
   307
    }
jaroslav@1890
   308
jaroslav@1890
   309
    // Public methods
jaroslav@1890
   310
jaroslav@1890
   311
    /**
jaroslav@1890
   312
     * Returns the pool hosting this thread.
jaroslav@1890
   313
     *
jaroslav@1890
   314
     * @return the pool
jaroslav@1890
   315
     */
jaroslav@1890
   316
    public ForkJoinPool getPool() {
jaroslav@1890
   317
        return pool;
jaroslav@1890
   318
    }
jaroslav@1890
   319
jaroslav@1890
   320
    /**
jaroslav@1890
   321
     * Returns the index number of this thread in its pool.  The
jaroslav@1890
   322
     * returned value ranges from zero to the maximum number of
jaroslav@1890
   323
     * threads (minus one) that have ever been created in the pool.
jaroslav@1890
   324
     * This method may be useful for applications that track status or
jaroslav@1890
   325
     * collect results per-worker rather than per-task.
jaroslav@1890
   326
     *
jaroslav@1890
   327
     * @return the index number
jaroslav@1890
   328
     */
jaroslav@1890
   329
    public int getPoolIndex() {
jaroslav@1890
   330
        return poolIndex;
jaroslav@1890
   331
    }
jaroslav@1890
   332
jaroslav@1890
   333
    // Randomization
jaroslav@1890
   334
jaroslav@1890
   335
    /**
jaroslav@1890
   336
     * Computes next value for random victim probes and backoffs.
jaroslav@1890
   337
     * Scans don't require a very high quality generator, but also not
jaroslav@1890
   338
     * a crummy one.  Marsaglia xor-shift is cheap and works well
jaroslav@1890
   339
     * enough.  Note: This is manually inlined in FJP.scan() to avoid
jaroslav@1890
   340
     * writes inside busy loops.
jaroslav@1890
   341
     */
jaroslav@1890
   342
    private int nextSeed() {
jaroslav@1890
   343
        int r = seed;
jaroslav@1890
   344
        r ^= r << 13;
jaroslav@1890
   345
        r ^= r >>> 17;
jaroslav@1890
   346
        r ^= r << 5;
jaroslav@1890
   347
        return seed = r;
jaroslav@1890
   348
    }
jaroslav@1890
   349
jaroslav@1890
   350
    // Run State management
jaroslav@1890
   351
jaroslav@1890
   352
    /**
jaroslav@1890
   353
     * Initializes internal state after construction but before
jaroslav@1890
   354
     * processing any tasks. If you override this method, you must
jaroslav@1890
   355
     * invoke {@code super.onStart()} at the beginning of the method.
jaroslav@1890
   356
     * Initialization requires care: Most fields must have legal
jaroslav@1890
   357
     * default values, to ensure that attempted accesses from other
jaroslav@1890
   358
     * threads work correctly even before this thread starts
jaroslav@1890
   359
     * processing tasks.
jaroslav@1890
   360
     */
jaroslav@1890
   361
    protected void onStart() {
jaroslav@1890
   362
        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
jaroslav@1890
   363
        int r = pool.workerSeedGenerator.nextInt();
jaroslav@1890
   364
        seed = (r == 0) ? 1 : r; //  must be nonzero
jaroslav@1890
   365
    }
jaroslav@1890
   366
jaroslav@1890
   367
    /**
jaroslav@1890
   368
     * Performs cleanup associated with termination of this worker
jaroslav@1890
   369
     * thread.  If you override this method, you must invoke
jaroslav@1890
   370
     * {@code super.onTermination} at the end of the overridden method.
jaroslav@1890
   371
     *
jaroslav@1890
   372
     * @param exception the exception causing this thread to abort due
jaroslav@1890
   373
     * to an unrecoverable error, or {@code null} if completed normally
jaroslav@1890
   374
     */
jaroslav@1890
   375
    protected void onTermination(Throwable exception) {
jaroslav@1890
   376
        try {
jaroslav@1890
   377
            terminate = true;
jaroslav@1890
   378
            cancelTasks();
jaroslav@1890
   379
            pool.deregisterWorker(this, exception);
jaroslav@1890
   380
        } catch (Throwable ex) {        // Shouldn't ever happen
jaroslav@1890
   381
            if (exception == null)      // but if so, at least rethrown
jaroslav@1890
   382
                exception = ex;
jaroslav@1890
   383
        } finally {
jaroslav@1890
   384
            if (exception != null)
jaroslav@1890
   385
                UNSAFE.throwException(exception);
jaroslav@1890
   386
        }
jaroslav@1890
   387
    }
jaroslav@1890
   388
jaroslav@1890
   389
    /**
jaroslav@1890
   390
     * This method is required to be public, but should never be
jaroslav@1890
   391
     * called explicitly. It performs the main run loop to execute
jaroslav@1890
   392
     * {@link ForkJoinTask}s.
jaroslav@1890
   393
     */
jaroslav@1890
   394
    public void run() {
jaroslav@1890
   395
        Throwable exception = null;
jaroslav@1890
   396
        try {
jaroslav@1890
   397
            onStart();
jaroslav@1890
   398
            pool.work(this);
jaroslav@1890
   399
        } catch (Throwable ex) {
jaroslav@1890
   400
            exception = ex;
jaroslav@1890
   401
        } finally {
jaroslav@1890
   402
            onTermination(exception);
jaroslav@1890
   403
        }
jaroslav@1890
   404
    }
jaroslav@1890
   405
jaroslav@1890
   406
    /*
jaroslav@1890
   407
     * Intrinsics-based atomic writes for queue slots. These are
jaroslav@1890
   408
     * basically the same as methods in AtomicReferenceArray, but
jaroslav@1890
   409
     * specialized for (1) ForkJoinTask elements (2) requirement that
jaroslav@1890
   410
     * nullness and bounds checks have already been performed by
jaroslav@1890
   411
     * callers and (3) effective offsets are known not to overflow
jaroslav@1890
   412
     * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
jaroslav@1890
   413
     * need corresponding version for reads: plain array reads are OK
jaroslav@1890
   414
     * because they are protected by other volatile reads and are
jaroslav@1890
   415
     * confirmed by CASes.
jaroslav@1890
   416
     *
jaroslav@1890
   417
     * Most uses don't actually call these methods, but instead
jaroslav@1890
   418
     * contain inlined forms that enable more predictable
jaroslav@1890
   419
     * optimization.  We don't define the version of write used in
jaroslav@1890
   420
     * pushTask at all, but instead inline there a store-fenced array
jaroslav@1890
   421
     * slot write.
jaroslav@1890
   422
     *
jaroslav@1890
   423
     * Also in most methods, as a performance (not correctness) issue,
jaroslav@1890
   424
     * we'd like to encourage compilers not to arbitrarily postpone
jaroslav@1890
   425
     * setting queueTop after writing slot.  Currently there is no
jaroslav@1890
   426
     * intrinsic for arranging this, but using Unsafe putOrderedInt
jaroslav@1890
   427
     * may be a preferable strategy on some compilers even though its
jaroslav@1890
   428
     * main effect is a pre-, not post- fence. To simplify possible
jaroslav@1890
   429
     * changes, the option is left in comments next to the associated
jaroslav@1890
   430
     * assignments.
jaroslav@1890
   431
     */
jaroslav@1890
   432
jaroslav@1890
   433
    /**
jaroslav@1890
   434
     * CASes slot i of array q from t to null. Caller must ensure q is
jaroslav@1890
   435
     * non-null and index is in range.
jaroslav@1890
   436
     */
jaroslav@1890
   437
    private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
jaroslav@1890
   438
                                             ForkJoinTask<?> t) {
jaroslav@1890
   439
        return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
jaroslav@1890
   440
    }
jaroslav@1890
   441
jaroslav@1890
   442
    /**
jaroslav@1890
   443
     * Performs a volatile write of the given task at given slot of
jaroslav@1890
   444
     * array q.  Caller must ensure q is non-null and index is in
jaroslav@1890
   445
     * range. This method is used only during resets and backouts.
jaroslav@1890
   446
     */
jaroslav@1890
   447
    private static final void writeSlot(ForkJoinTask<?>[] q, int i,
jaroslav@1890
   448
                                        ForkJoinTask<?> t) {
jaroslav@1890
   449
        UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
jaroslav@1890
   450
    }
jaroslav@1890
   451
jaroslav@1890
   452
    // queue methods
jaroslav@1890
   453
jaroslav@1890
   454
    /**
jaroslav@1890
   455
     * Pushes a task. Call only from this thread.
jaroslav@1890
   456
     *
jaroslav@1890
   457
     * @param t the task. Caller must ensure non-null.
jaroslav@1890
   458
     */
jaroslav@1890
   459
    final void pushTask(ForkJoinTask<?> t) {
jaroslav@1890
   460
        ForkJoinTask<?>[] q; int s, m;
jaroslav@1890
   461
        if ((q = queue) != null) {    // ignore if queue removed
jaroslav@1890
   462
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
jaroslav@1890
   463
            UNSAFE.putOrderedObject(q, u, t);
jaroslav@1890
   464
            queueTop = s + 1;         // or use putOrderedInt
jaroslav@1890
   465
            if ((s -= queueBase) <= 2)
jaroslav@1890
   466
                pool.signalWork();
jaroslav@1890
   467
            else if (s == m)
jaroslav@1890
   468
                growQueue();
jaroslav@1890
   469
        }
jaroslav@1890
   470
    }
jaroslav@1890
   471
jaroslav@1890
   472
    /**
jaroslav@1890
   473
     * Creates or doubles queue array.  Transfers elements by
jaroslav@1890
   474
     * emulating steals (deqs) from old array and placing, oldest
jaroslav@1890
   475
     * first, into new array.
jaroslav@1890
   476
     */
jaroslav@1890
   477
    private void growQueue() {
jaroslav@1890
   478
        ForkJoinTask<?>[] oldQ = queue;
jaroslav@1890
   479
        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
jaroslav@1890
   480
        if (size > MAXIMUM_QUEUE_CAPACITY)
jaroslav@1890
   481
            throw new RejectedExecutionException("Queue capacity exceeded");
jaroslav@1890
   482
        if (size < INITIAL_QUEUE_CAPACITY)
jaroslav@1890
   483
            size = INITIAL_QUEUE_CAPACITY;
jaroslav@1890
   484
        ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
jaroslav@1890
   485
        int mask = size - 1;
jaroslav@1890
   486
        int top = queueTop;
jaroslav@1890
   487
        int oldMask;
jaroslav@1890
   488
        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
jaroslav@1890
   489
            for (int b = queueBase; b != top; ++b) {
jaroslav@1890
   490
                long u = ((b & oldMask) << ASHIFT) + ABASE;
jaroslav@1890
   491
                Object x = UNSAFE.getObjectVolatile(oldQ, u);
jaroslav@1890
   492
                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
jaroslav@1890
   493
                    UNSAFE.putObjectVolatile
jaroslav@1890
   494
                        (q, ((b & mask) << ASHIFT) + ABASE, x);
jaroslav@1890
   495
            }
jaroslav@1890
   496
        }
jaroslav@1890
   497
    }
jaroslav@1890
   498
jaroslav@1890
   499
    /**
jaroslav@1890
   500
     * Tries to take a task from the base of the queue, failing if
jaroslav@1890
   501
     * empty or contended. Note: Specializations of this code appear
jaroslav@1890
   502
     * in locallyDeqTask and elsewhere.
jaroslav@1890
   503
     *
jaroslav@1890
   504
     * @return a task, or null if none or contended
jaroslav@1890
   505
     */
jaroslav@1890
   506
    final ForkJoinTask<?> deqTask() {
jaroslav@1890
   507
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
jaroslav@1890
   508
        if (queueTop != (b = queueBase) &&
jaroslav@1890
   509
            (q = queue) != null && // must read q after b
jaroslav@1890
   510
            (i = (q.length - 1) & b) >= 0 &&
jaroslav@1890
   511
            (t = q[i]) != null && queueBase == b &&
jaroslav@1890
   512
            UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
jaroslav@1890
   513
            queueBase = b + 1;
jaroslav@1890
   514
            return t;
jaroslav@1890
   515
        }
jaroslav@1890
   516
        return null;
jaroslav@1890
   517
    }
jaroslav@1890
   518
jaroslav@1890
   519
    /**
jaroslav@1890
   520
     * Tries to take a task from the base of own queue.  Called only
jaroslav@1890
   521
     * by this thread.
jaroslav@1890
   522
     *
jaroslav@1890
   523
     * @return a task, or null if none
jaroslav@1890
   524
     */
jaroslav@1890
   525
    final ForkJoinTask<?> locallyDeqTask() {
jaroslav@1890
   526
        ForkJoinTask<?> t; int m, b, i;
jaroslav@1890
   527
        ForkJoinTask<?>[] q = queue;
jaroslav@1890
   528
        if (q != null && (m = q.length - 1) >= 0) {
jaroslav@1890
   529
            while (queueTop != (b = queueBase)) {
jaroslav@1890
   530
                if ((t = q[i = m & b]) != null &&
jaroslav@1890
   531
                    queueBase == b &&
jaroslav@1890
   532
                    UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
jaroslav@1890
   533
                                                t, null)) {
jaroslav@1890
   534
                    queueBase = b + 1;
jaroslav@1890
   535
                    return t;
jaroslav@1890
   536
                }
jaroslav@1890
   537
            }
jaroslav@1890
   538
        }
jaroslav@1890
   539
        return null;
jaroslav@1890
   540
    }
jaroslav@1890
   541
jaroslav@1890
   542
    /**
jaroslav@1890
   543
     * Returns a popped task, or null if empty.
jaroslav@1890
   544
     * Called only by this thread.
jaroslav@1890
   545
     */
jaroslav@1890
   546
    private ForkJoinTask<?> popTask() {
jaroslav@1890
   547
        int m;
jaroslav@1890
   548
        ForkJoinTask<?>[] q = queue;
jaroslav@1890
   549
        if (q != null && (m = q.length - 1) >= 0) {
jaroslav@1890
   550
            for (int s; (s = queueTop) != queueBase;) {
jaroslav@1890
   551
                int i = m & --s;
jaroslav@1890
   552
                long u = (i << ASHIFT) + ABASE; // raw offset
jaroslav@1890
   553
                ForkJoinTask<?> t = q[i];
jaroslav@1890
   554
                if (t == null)   // lost to stealer
jaroslav@1890
   555
                    break;
jaroslav@1890
   556
                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
jaroslav@1890
   557
                    queueTop = s; // or putOrderedInt
jaroslav@1890
   558
                    return t;
jaroslav@1890
   559
                }
jaroslav@1890
   560
            }
jaroslav@1890
   561
        }
jaroslav@1890
   562
        return null;
jaroslav@1890
   563
    }
jaroslav@1890
   564
jaroslav@1890
   565
    /**
jaroslav@1890
   566
     * Specialized version of popTask to pop only if topmost element
jaroslav@1890
   567
     * is the given task. Called only by this thread.
jaroslav@1890
   568
     *
jaroslav@1890
   569
     * @param t the task. Caller must ensure non-null.
jaroslav@1890
   570
     */
jaroslav@1890
   571
    final boolean unpushTask(ForkJoinTask<?> t) {
jaroslav@1890
   572
        ForkJoinTask<?>[] q;
jaroslav@1890
   573
        int s;
jaroslav@1890
   574
        if ((q = queue) != null && (s = queueTop) != queueBase &&
jaroslav@1890
   575
            UNSAFE.compareAndSwapObject
jaroslav@1890
   576
            (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
jaroslav@1890
   577
            queueTop = s; // or putOrderedInt
jaroslav@1890
   578
            return true;
jaroslav@1890
   579
        }
jaroslav@1890
   580
        return false;
jaroslav@1890
   581
    }
jaroslav@1890
   582
jaroslav@1890
   583
    /**
jaroslav@1890
   584
     * Returns next task, or null if empty or contended.
jaroslav@1890
   585
     */
jaroslav@1890
   586
    final ForkJoinTask<?> peekTask() {
jaroslav@1890
   587
        int m;
jaroslav@1890
   588
        ForkJoinTask<?>[] q = queue;
jaroslav@1890
   589
        if (q == null || (m = q.length - 1) < 0)
jaroslav@1890
   590
            return null;
jaroslav@1890
   591
        int i = locallyFifo ? queueBase : (queueTop - 1);
jaroslav@1890
   592
        return q[i & m];
jaroslav@1890
   593
    }
jaroslav@1890
   594
jaroslav@1890
   595
    // Support methods for ForkJoinPool
jaroslav@1890
   596
jaroslav@1890
   597
    /**
jaroslav@1890
   598
     * Runs the given task, plus any local tasks until queue is empty
jaroslav@1890
   599
     */
jaroslav@1890
   600
    final void execTask(ForkJoinTask<?> t) {
jaroslav@1890
   601
        currentSteal = t;
jaroslav@1890
   602
        for (;;) {
jaroslav@1890
   603
            if (t != null)
jaroslav@1890
   604
                t.doExec();
jaroslav@1890
   605
            if (queueTop == queueBase)
jaroslav@1890
   606
                break;
jaroslav@1890
   607
            t = locallyFifo ? locallyDeqTask() : popTask();
jaroslav@1890
   608
        }
jaroslav@1890
   609
        ++stealCount;
jaroslav@1890
   610
        currentSteal = null;
jaroslav@1890
   611
    }
jaroslav@1890
   612
jaroslav@1890
   613
    /**
jaroslav@1890
   614
     * Removes and cancels all tasks in queue.  Can be called from any
jaroslav@1890
   615
     * thread.
jaroslav@1890
   616
     */
jaroslav@1890
   617
    final void cancelTasks() {
jaroslav@1890
   618
        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
jaroslav@1890
   619
        if (cj != null && cj.status >= 0)
jaroslav@1890
   620
            cj.cancelIgnoringExceptions();
jaroslav@1890
   621
        ForkJoinTask<?> cs = currentSteal;
jaroslav@1890
   622
        if (cs != null && cs.status >= 0)
jaroslav@1890
   623
            cs.cancelIgnoringExceptions();
jaroslav@1890
   624
        while (queueBase != queueTop) {
jaroslav@1890
   625
            ForkJoinTask<?> t = deqTask();
jaroslav@1890
   626
            if (t != null)
jaroslav@1890
   627
                t.cancelIgnoringExceptions();
jaroslav@1890
   628
        }
jaroslav@1890
   629
    }
jaroslav@1890
   630
jaroslav@1890
   631
    /**
jaroslav@1890
   632
     * Drains tasks to given collection c.
jaroslav@1890
   633
     *
jaroslav@1890
   634
     * @return the number of tasks drained
jaroslav@1890
   635
     */
jaroslav@1890
   636
    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
jaroslav@1890
   637
        int n = 0;
jaroslav@1890
   638
        while (queueBase != queueTop) {
jaroslav@1890
   639
            ForkJoinTask<?> t = deqTask();
jaroslav@1890
   640
            if (t != null) {
jaroslav@1890
   641
                c.add(t);
jaroslav@1890
   642
                ++n;
jaroslav@1890
   643
            }
jaroslav@1890
   644
        }
jaroslav@1890
   645
        return n;
jaroslav@1890
   646
    }
jaroslav@1890
   647
jaroslav@1890
   648
    // Support methods for ForkJoinTask
jaroslav@1890
   649
jaroslav@1890
   650
    /**
jaroslav@1890
   651
     * Returns an estimate of the number of tasks in the queue.
jaroslav@1890
   652
     */
jaroslav@1890
   653
    final int getQueueSize() {
jaroslav@1890
   654
        return queueTop - queueBase;
jaroslav@1890
   655
    }
jaroslav@1890
   656
jaroslav@1890
   657
    /**
jaroslav@1890
   658
     * Gets and removes a local task.
jaroslav@1890
   659
     *
jaroslav@1890
   660
     * @return a task, if available
jaroslav@1890
   661
     */
jaroslav@1890
   662
    final ForkJoinTask<?> pollLocalTask() {
jaroslav@1890
   663
        return locallyFifo ? locallyDeqTask() : popTask();
jaroslav@1890
   664
    }
jaroslav@1890
   665
jaroslav@1890
   666
    /**
jaroslav@1890
   667
     * Gets and removes a local or stolen task.
jaroslav@1890
   668
     *
jaroslav@1890
   669
     * @return a task, if available
jaroslav@1890
   670
     */
jaroslav@1890
   671
    final ForkJoinTask<?> pollTask() {
jaroslav@1890
   672
        ForkJoinWorkerThread[] ws;
jaroslav@1890
   673
        ForkJoinTask<?> t = pollLocalTask();
jaroslav@1890
   674
        if (t != null || (ws = pool.workers) == null)
jaroslav@1890
   675
            return t;
jaroslav@1890
   676
        int n = ws.length; // cheap version of FJP.scan
jaroslav@1890
   677
        int steps = n << 1;
jaroslav@1890
   678
        int r = nextSeed();
jaroslav@1890
   679
        int i = 0;
jaroslav@1890
   680
        while (i < steps) {
jaroslav@1890
   681
            ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
jaroslav@1890
   682
            if (w != null && w.queueBase != w.queueTop && w.queue != null) {
jaroslav@1890
   683
                if ((t = w.deqTask()) != null)
jaroslav@1890
   684
                    return t;
jaroslav@1890
   685
                i = 0;
jaroslav@1890
   686
            }
jaroslav@1890
   687
        }
jaroslav@1890
   688
        return null;
jaroslav@1890
   689
    }
jaroslav@1890
   690
jaroslav@1890
   691
    /**
jaroslav@1890
   692
     * The maximum stolen->joining link depth allowed in helpJoinTask,
jaroslav@1890
   693
     * as well as the maximum number of retries (allowing on average
jaroslav@1890
   694
     * one staleness retry per level) per attempt to instead try
jaroslav@1890
   695
     * compensation.  Depths for legitimate chains are unbounded, but
jaroslav@1890
   696
     * we use a fixed constant to avoid (otherwise unchecked) cycles
jaroslav@1890
   697
     * and bound staleness of traversal parameters at the expense of
jaroslav@1890
   698
     * sometimes blocking when we could be helping.
jaroslav@1890
   699
     */
jaroslav@1890
   700
    private static final int MAX_HELP = 16;
jaroslav@1890
   701
jaroslav@1890
   702
    /**
jaroslav@1890
   703
     * Possibly runs some tasks and/or blocks, until joinMe is done.
jaroslav@1890
   704
     *
jaroslav@1890
   705
     * @param joinMe the task to join
jaroslav@1890
   706
     * @return completion status on exit
jaroslav@1890
   707
     */
jaroslav@1890
   708
    final int joinTask(ForkJoinTask<?> joinMe) {
jaroslav@1890
   709
        ForkJoinTask<?> prevJoin = currentJoin;
jaroslav@1890
   710
        currentJoin = joinMe;
jaroslav@1890
   711
        for (int s, retries = MAX_HELP;;) {
jaroslav@1890
   712
            if ((s = joinMe.status) < 0) {
jaroslav@1890
   713
                currentJoin = prevJoin;
jaroslav@1890
   714
                return s;
jaroslav@1890
   715
            }
jaroslav@1890
   716
            if (retries > 0) {
jaroslav@1890
   717
                if (queueTop != queueBase) {
jaroslav@1890
   718
                    if (!localHelpJoinTask(joinMe))
jaroslav@1890
   719
                        retries = 0;           // cannot help
jaroslav@1890
   720
                }
jaroslav@1890
   721
                else if (retries == MAX_HELP >>> 1) {
jaroslav@1890
   722
                    --retries;                 // check uncommon case
jaroslav@1890
   723
                    if (tryDeqAndExec(joinMe) >= 0)
jaroslav@1890
   724
                        Thread.yield();        // for politeness
jaroslav@1890
   725
                }
jaroslav@1890
   726
                else
jaroslav@1890
   727
                    retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
jaroslav@1890
   728
            }
jaroslav@1890
   729
            else {
jaroslav@1890
   730
                retries = MAX_HELP;           // restart if not done
jaroslav@1890
   731
                pool.tryAwaitJoin(joinMe);
jaroslav@1890
   732
            }
jaroslav@1890
   733
        }
jaroslav@1890
   734
    }
jaroslav@1890
   735
jaroslav@1890
   736
    /**
jaroslav@1890
   737
     * If present, pops and executes the given task, or any other
jaroslav@1890
   738
     * cancelled task
jaroslav@1890
   739
     *
jaroslav@1890
   740
     * @return false if any other non-cancelled task exists in local queue
jaroslav@1890
   741
     */
jaroslav@1890
   742
    private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
jaroslav@1890
   743
        int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
jaroslav@1890
   744
        if ((s = queueTop) != queueBase && (q = queue) != null &&
jaroslav@1890
   745
            (i = (q.length - 1) & --s) >= 0 &&
jaroslav@1890
   746
            (t = q[i]) != null) {
jaroslav@1890
   747
            if (t != joinMe && t.status >= 0)
jaroslav@1890
   748
                return false;
jaroslav@1890
   749
            if (UNSAFE.compareAndSwapObject
jaroslav@1890
   750
                (q, (i << ASHIFT) + ABASE, t, null)) {
jaroslav@1890
   751
                queueTop = s;           // or putOrderedInt
jaroslav@1890
   752
                t.doExec();
jaroslav@1890
   753
            }
jaroslav@1890
   754
        }
jaroslav@1890
   755
        return true;
jaroslav@1890
   756
    }
jaroslav@1890
   757
jaroslav@1890
   758
    /**
jaroslav@1890
   759
     * Tries to locate and execute tasks for a stealer of the given
jaroslav@1890
   760
     * task, or in turn one of its stealers, Traces
jaroslav@1890
   761
     * currentSteal->currentJoin links looking for a thread working on
jaroslav@1890
   762
     * a descendant of the given task and with a non-empty queue to
jaroslav@1890
   763
     * steal back and execute tasks from.  The implementation is very
jaroslav@1890
   764
     * branchy to cope with potential inconsistencies or loops
jaroslav@1890
   765
     * encountering chains that are stale, unknown, or of length
jaroslav@1890
   766
     * greater than MAX_HELP links.  All of these cases are dealt with
jaroslav@1890
   767
     * by just retrying by caller.
jaroslav@1890
   768
     *
jaroslav@1890
   769
     * @param joinMe the task to join
jaroslav@1890
   770
     * @param canSteal true if local queue is empty
jaroslav@1890
   771
     * @return true if ran a task
jaroslav@1890
   772
     */
jaroslav@1890
   773
    private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
jaroslav@1890
   774
        boolean helped = false;
jaroslav@1890
   775
        int m = pool.scanGuard & SMASK;
jaroslav@1890
   776
        ForkJoinWorkerThread[] ws = pool.workers;
jaroslav@1890
   777
        if (ws != null && ws.length > m && joinMe.status >= 0) {
jaroslav@1890
   778
            int levels = MAX_HELP;              // remaining chain length
jaroslav@1890
   779
            ForkJoinTask<?> task = joinMe;      // base of chain
jaroslav@1890
   780
            outer:for (ForkJoinWorkerThread thread = this;;) {
jaroslav@1890
   781
                // Try to find v, the stealer of task, by first using hint
jaroslav@1890
   782
                ForkJoinWorkerThread v = ws[thread.stealHint & m];
jaroslav@1890
   783
                if (v == null || v.currentSteal != task) {
jaroslav@1890
   784
                    for (int j = 0; ;) {        // search array
jaroslav@1890
   785
                        if ((v = ws[j]) != null && v.currentSteal == task) {
jaroslav@1890
   786
                            thread.stealHint = j;
jaroslav@1890
   787
                            break;              // save hint for next time
jaroslav@1890
   788
                        }
jaroslav@1890
   789
                        if (++j > m)
jaroslav@1890
   790
                            break outer;        // can't find stealer
jaroslav@1890
   791
                    }
jaroslav@1890
   792
                }
jaroslav@1890
   793
                // Try to help v, using specialized form of deqTask
jaroslav@1890
   794
                for (;;) {
jaroslav@1890
   795
                    ForkJoinTask<?>[] q; int b, i;
jaroslav@1890
   796
                    if (joinMe.status < 0)
jaroslav@1890
   797
                        break outer;
jaroslav@1890
   798
                    if ((b = v.queueBase) == v.queueTop ||
jaroslav@1890
   799
                        (q = v.queue) == null ||
jaroslav@1890
   800
                        (i = (q.length-1) & b) < 0)
jaroslav@1890
   801
                        break;                  // empty
jaroslav@1890
   802
                    long u = (i << ASHIFT) + ABASE;
jaroslav@1890
   803
                    ForkJoinTask<?> t = q[i];
jaroslav@1890
   804
                    if (task.status < 0)
jaroslav@1890
   805
                        break outer;            // stale
jaroslav@1890
   806
                    if (t != null && v.queueBase == b &&
jaroslav@1890
   807
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
jaroslav@1890
   808
                        v.queueBase = b + 1;
jaroslav@1890
   809
                        v.stealHint = poolIndex;
jaroslav@1890
   810
                        ForkJoinTask<?> ps = currentSteal;
jaroslav@1890
   811
                        currentSteal = t;
jaroslav@1890
   812
                        t.doExec();
jaroslav@1890
   813
                        currentSteal = ps;
jaroslav@1890
   814
                        helped = true;
jaroslav@1890
   815
                    }
jaroslav@1890
   816
                }
jaroslav@1890
   817
                // Try to descend to find v's stealer
jaroslav@1890
   818
                ForkJoinTask<?> next = v.currentJoin;
jaroslav@1890
   819
                if (--levels > 0 && task.status >= 0 &&
jaroslav@1890
   820
                    next != null && next != task) {
jaroslav@1890
   821
                    task = next;
jaroslav@1890
   822
                    thread = v;
jaroslav@1890
   823
                }
jaroslav@1890
   824
                else
jaroslav@1890
   825
                    break;  // max levels, stale, dead-end, or cyclic
jaroslav@1890
   826
            }
jaroslav@1890
   827
        }
jaroslav@1890
   828
        return helped;
jaroslav@1890
   829
    }
jaroslav@1890
   830
jaroslav@1890
   831
    /**
jaroslav@1890
   832
     * Performs an uncommon case for joinTask: If task t is at base of
jaroslav@1890
   833
     * some workers queue, steals and executes it.
jaroslav@1890
   834
     *
jaroslav@1890
   835
     * @param t the task
jaroslav@1890
   836
     * @return t's status
jaroslav@1890
   837
     */
jaroslav@1890
   838
    private int tryDeqAndExec(ForkJoinTask<?> t) {
jaroslav@1890
   839
        int m = pool.scanGuard & SMASK;
jaroslav@1890
   840
        ForkJoinWorkerThread[] ws = pool.workers;
jaroslav@1890
   841
        if (ws != null && ws.length > m && t.status >= 0) {
jaroslav@1890
   842
            for (int j = 0; j <= m; ++j) {
jaroslav@1890
   843
                ForkJoinTask<?>[] q; int b, i;
jaroslav@1890
   844
                ForkJoinWorkerThread v = ws[j];
jaroslav@1890
   845
                if (v != null &&
jaroslav@1890
   846
                    (b = v.queueBase) != v.queueTop &&
jaroslav@1890
   847
                    (q = v.queue) != null &&
jaroslav@1890
   848
                    (i = (q.length - 1) & b) >= 0 &&
jaroslav@1890
   849
                    q[i] ==  t) {
jaroslav@1890
   850
                    long u = (i << ASHIFT) + ABASE;
jaroslav@1890
   851
                    if (v.queueBase == b &&
jaroslav@1890
   852
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
jaroslav@1890
   853
                        v.queueBase = b + 1;
jaroslav@1890
   854
                        v.stealHint = poolIndex;
jaroslav@1890
   855
                        ForkJoinTask<?> ps = currentSteal;
jaroslav@1890
   856
                        currentSteal = t;
jaroslav@1890
   857
                        t.doExec();
jaroslav@1890
   858
                        currentSteal = ps;
jaroslav@1890
   859
                    }
jaroslav@1890
   860
                    break;
jaroslav@1890
   861
                }
jaroslav@1890
   862
            }
jaroslav@1890
   863
        }
jaroslav@1890
   864
        return t.status;
jaroslav@1890
   865
    }
jaroslav@1890
   866
jaroslav@1890
   867
    /**
jaroslav@1890
   868
     * Implements ForkJoinTask.getSurplusQueuedTaskCount().  Returns
jaroslav@1890
   869
     * an estimate of the number of tasks, offset by a function of
jaroslav@1890
   870
     * number of idle workers.
jaroslav@1890
   871
     *
jaroslav@1890
   872
     * This method provides a cheap heuristic guide for task
jaroslav@1890
   873
     * partitioning when programmers, frameworks, tools, or languages
jaroslav@1890
   874
     * have little or no idea about task granularity.  In essence by
jaroslav@1890
   875
     * offering this method, we ask users only about tradeoffs in
jaroslav@1890
   876
     * overhead vs expected throughput and its variance, rather than
jaroslav@1890
   877
     * how finely to partition tasks.
jaroslav@1890
   878
     *
jaroslav@1890
   879
     * In a steady state strict (tree-structured) computation, each
jaroslav@1890
   880
     * thread makes available for stealing enough tasks for other
jaroslav@1890
   881
     * threads to remain active. Inductively, if all threads play by
jaroslav@1890
   882
     * the same rules, each thread should make available only a
jaroslav@1890
   883
     * constant number of tasks.
jaroslav@1890
   884
     *
jaroslav@1890
   885
     * The minimum useful constant is just 1. But using a value of 1
jaroslav@1890
   886
     * would require immediate replenishment upon each steal to
jaroslav@1890
   887
     * maintain enough tasks, which is infeasible.  Further,
jaroslav@1890
   888
     * partitionings/granularities of offered tasks should minimize
jaroslav@1890
   889
     * steal rates, which in general means that threads nearer the top
jaroslav@1890
   890
     * of computation tree should generate more than those nearer the
jaroslav@1890
   891
     * bottom. In perfect steady state, each thread is at
jaroslav@1890
   892
     * approximately the same level of computation tree. However,
jaroslav@1890
   893
     * producing extra tasks amortizes the uncertainty of progress and
jaroslav@1890
   894
     * diffusion assumptions.
jaroslav@1890
   895
     *
jaroslav@1890
   896
     * So, users will want to use values larger, but not much larger
jaroslav@1890
   897
     * than 1 to both smooth over transient shortages and hedge
jaroslav@1890
   898
     * against uneven progress; as traded off against the cost of
jaroslav@1890
   899
     * extra task overhead. We leave the user to pick a threshold
jaroslav@1890
   900
     * value to compare with the results of this call to guide
jaroslav@1890
   901
     * decisions, but recommend values such as 3.
jaroslav@1890
   902
     *
jaroslav@1890
   903
     * When all threads are active, it is on average OK to estimate
jaroslav@1890
   904
     * surplus strictly locally. In steady-state, if one thread is
jaroslav@1890
   905
     * maintaining say 2 surplus tasks, then so are others. So we can
jaroslav@1890
   906
     * just use estimated queue length (although note that (queueTop -
jaroslav@1890
   907
     * queueBase) can be an overestimate because of stealers lagging
jaroslav@1890
   908
     * increments of queueBase).  However, this strategy alone leads
jaroslav@1890
   909
     * to serious mis-estimates in some non-steady-state conditions
jaroslav@1890
   910
     * (ramp-up, ramp-down, other stalls). We can detect many of these
jaroslav@1890
   911
     * by further considering the number of "idle" threads, that are
jaroslav@1890
   912
     * known to have zero queued tasks, so compensate by a factor of
jaroslav@1890
   913
     * (#idle/#active) threads.
jaroslav@1890
   914
     */
jaroslav@1890
   915
    final int getEstimatedSurplusTaskCount() {
jaroslav@1890
   916
        return queueTop - queueBase - pool.idlePerActive();
jaroslav@1890
   917
    }
jaroslav@1890
   918
jaroslav@1890
   919
    /**
jaroslav@1890
   920
     * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
jaroslav@1890
   921
     * pool's active count ctl maintenance, but rather than blocking
jaroslav@1890
   922
     * when tasks cannot be found, we rescan until all others cannot
jaroslav@1890
   923
     * find tasks either. The bracketing by pool quiescerCounts
jaroslav@1890
   924
     * updates suppresses pool auto-shutdown mechanics that could
jaroslav@1890
   925
     * otherwise prematurely terminate the pool because all threads
jaroslav@1890
   926
     * appear to be inactive.
jaroslav@1890
   927
     */
jaroslav@1890
   928
    final void helpQuiescePool() {
jaroslav@1890
   929
        boolean active = true;
jaroslav@1890
   930
        ForkJoinTask<?> ps = currentSteal; // to restore below
jaroslav@1890
   931
        ForkJoinPool p = pool;
jaroslav@1890
   932
        p.addQuiescerCount(1);
jaroslav@1890
   933
        for (;;) {
jaroslav@1890
   934
            ForkJoinWorkerThread[] ws = p.workers;
jaroslav@1890
   935
            ForkJoinWorkerThread v = null;
jaroslav@1890
   936
            int n;
jaroslav@1890
   937
            if (queueTop != queueBase)
jaroslav@1890
   938
                v = this;
jaroslav@1890
   939
            else if (ws != null && (n = ws.length) > 1) {
jaroslav@1890
   940
                ForkJoinWorkerThread w;
jaroslav@1890
   941
                int r = nextSeed(); // cheap version of FJP.scan
jaroslav@1890
   942
                int steps = n << 1;
jaroslav@1890
   943
                for (int i = 0; i < steps; ++i) {
jaroslav@1890
   944
                    if ((w = ws[(i + r) & (n - 1)]) != null &&
jaroslav@1890
   945
                        w.queueBase != w.queueTop) {
jaroslav@1890
   946
                        v = w;
jaroslav@1890
   947
                        break;
jaroslav@1890
   948
                    }
jaroslav@1890
   949
                }
jaroslav@1890
   950
            }
jaroslav@1890
   951
            if (v != null) {
jaroslav@1890
   952
                ForkJoinTask<?> t;
jaroslav@1890
   953
                if (!active) {
jaroslav@1890
   954
                    active = true;
jaroslav@1890
   955
                    p.addActiveCount(1);
jaroslav@1890
   956
                }
jaroslav@1890
   957
                if ((t = (v != this) ? v.deqTask() :
jaroslav@1890
   958
                     locallyFifo ? locallyDeqTask() : popTask()) != null) {
jaroslav@1890
   959
                    currentSteal = t;
jaroslav@1890
   960
                    t.doExec();
jaroslav@1890
   961
                    currentSteal = ps;
jaroslav@1890
   962
                }
jaroslav@1890
   963
            }
jaroslav@1890
   964
            else {
jaroslav@1890
   965
                if (active) {
jaroslav@1890
   966
                    active = false;
jaroslav@1890
   967
                    p.addActiveCount(-1);
jaroslav@1890
   968
                }
jaroslav@1890
   969
                if (p.isQuiescent()) {
jaroslav@1890
   970
                    p.addActiveCount(1);
jaroslav@1890
   971
                    p.addQuiescerCount(-1);
jaroslav@1890
   972
                    break;
jaroslav@1890
   973
                }
jaroslav@1890
   974
            }
jaroslav@1890
   975
        }
jaroslav@1890
   976
    }
jaroslav@1890
   977
jaroslav@1890
   978
    // Unsafe mechanics
jaroslav@1896
   979
    private static final Unsafe UNSAFE;
jaroslav@1890
   980
    private static final long ABASE;
jaroslav@1890
   981
    private static final int ASHIFT;
jaroslav@1890
   982
jaroslav@1890
   983
    static {
jaroslav@1890
   984
        int s;
jaroslav@1890
   985
        try {
jaroslav@1896
   986
            UNSAFE = Unsafe.getUnsafe();
jaroslav@1890
   987
            Class a = ForkJoinTask[].class;
jaroslav@1890
   988
            ABASE = UNSAFE.arrayBaseOffset(a);
jaroslav@1890
   989
            s = UNSAFE.arrayIndexScale(a);
jaroslav@1890
   990
        } catch (Exception e) {
jaroslav@1890
   991
            throw new Error(e);
jaroslav@1890
   992
        }
jaroslav@1890
   993
        if ((s & (s-1)) != 0)
jaroslav@1890
   994
            throw new Error("data type scale not a power of two");
jaroslav@1890
   995
        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
jaroslav@1890
   996
    }
jaroslav@1890
   997
jaroslav@1890
   998
}