1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ForkJoinTask.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,1386 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +
1.41 +import java.io.Serializable;
1.42 +import java.util.Collection;
1.43 +import java.util.Collections;
1.44 +import java.util.List;
1.45 +import java.util.RandomAccess;
1.46 +import java.util.Map;
1.47 +import java.lang.ref.WeakReference;
1.48 +import java.lang.ref.ReferenceQueue;
1.49 +import java.util.concurrent.Callable;
1.50 +import java.util.concurrent.CancellationException;
1.51 +import java.util.concurrent.ExecutionException;
1.52 +import java.util.concurrent.Executor;
1.53 +import java.util.concurrent.ExecutorService;
1.54 +import java.util.concurrent.Future;
1.55 +import java.util.concurrent.RejectedExecutionException;
1.56 +import java.util.concurrent.RunnableFuture;
1.57 +import java.util.concurrent.TimeUnit;
1.58 +import java.util.concurrent.TimeoutException;
1.59 +import java.util.concurrent.locks.ReentrantLock;
1.60 +import java.lang.reflect.Constructor;
1.61 +
1.62 +/**
1.63 + * Abstract base class for tasks that run within a {@link ForkJoinPool}.
1.64 + * A {@code ForkJoinTask} is a thread-like entity that is much
1.65 + * lighter weight than a normal thread. Huge numbers of tasks and
1.66 + * subtasks may be hosted by a small number of actual threads in a
1.67 + * ForkJoinPool, at the price of some usage limitations.
1.68 + *
1.69 + * <p>A "main" {@code ForkJoinTask} begins execution when submitted
1.70 + * to a {@link ForkJoinPool}. Once started, it will usually in turn
1.71 + * start other subtasks. As indicated by the name of this class,
1.72 + * many programs using {@code ForkJoinTask} employ only methods
1.73 + * {@link #fork} and {@link #join}, or derivatives such as {@link
1.74 + * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
1.75 + * provides a number of other methods that can come into play in
1.76 + * advanced usages, as well as extension mechanics that allow
1.77 + * support of new forms of fork/join processing.
1.78 + *
1.79 + * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
1.80 + * The efficiency of {@code ForkJoinTask}s stems from a set of
1.81 + * restrictions (that are only partially statically enforceable)
1.82 + * reflecting their intended use as computational tasks calculating
1.83 + * pure functions or operating on purely isolated objects. The
1.84 + * primary coordination mechanisms are {@link #fork}, that arranges
1.85 + * asynchronous execution, and {@link #join}, that doesn't proceed
1.86 + * until the task's result has been computed. Computations should
1.87 + * avoid {@code synchronized} methods or blocks, and should minimize
1.88 + * other blocking synchronization apart from joining other tasks or
1.89 + * using synchronizers such as Phasers that are advertised to
1.90 + * cooperate with fork/join scheduling. Tasks should also not perform
1.91 + * blocking IO, and should ideally access variables that are
1.92 + * completely independent of those accessed by other running
1.93 + * tasks. Minor breaches of these restrictions, for example using
1.94 + * shared output streams, may be tolerable in practice, but frequent
1.95 + * use may result in poor performance, and the potential to
1.96 + * indefinitely stall if the number of threads not waiting for IO or
1.97 + * other external synchronization becomes exhausted. This usage
1.98 + * restriction is in part enforced by not permitting checked
1.99 + * exceptions such as {@code IOExceptions} to be thrown. However,
1.100 + * computations may still encounter unchecked exceptions, that are
1.101 + * rethrown to callers attempting to join them. These exceptions may
1.102 + * additionally include {@link RejectedExecutionException} stemming
1.103 + * from internal resource exhaustion, such as failure to allocate
1.104 + * internal task queues. Rethrown exceptions behave in the same way as
1.105 + * regular exceptions, but, when possible, contain stack traces (as
1.106 + * displayed for example using {@code ex.printStackTrace()}) of both
1.107 + * the thread that initiated the computation as well as the thread
1.108 + * actually encountering the exception; minimally only the latter.
1.109 + *
1.110 + * <p>The primary method for awaiting completion and extracting
1.111 + * results of a task is {@link #join}, but there are several variants:
1.112 + * The {@link Future#get} methods support interruptible and/or timed
1.113 + * waits for completion and report results using {@code Future}
1.114 + * conventions. Method {@link #invoke} is semantically
1.115 + * equivalent to {@code fork(); join()} but always attempts to begin
1.116 + * execution in the current thread. The "<em>quiet</em>" forms of
1.117 + * these methods do not extract results or report exceptions. These
1.118 + * may be useful when a set of tasks are being executed, and you need
1.119 + * to delay processing of results or exceptions until all complete.
1.120 + * Method {@code invokeAll} (available in multiple versions)
1.121 + * performs the most common form of parallel invocation: forking a set
1.122 + * of tasks and joining them all.
1.123 + *
1.124 + * <p>The execution status of tasks may be queried at several levels
1.125 + * of detail: {@link #isDone} is true if a task completed in any way
1.126 + * (including the case where a task was cancelled without executing);
1.127 + * {@link #isCompletedNormally} is true if a task completed without
1.128 + * cancellation or encountering an exception; {@link #isCancelled} is
1.129 + * true if the task was cancelled (in which case {@link #getException}
1.130 + * returns a {@link java.util.concurrent.CancellationException}); and
1.131 + * {@link #isCompletedAbnormally} is true if a task was either
1.132 + * cancelled or encountered an exception, in which case {@link
1.133 + * #getException} will return either the encountered exception or
1.134 + * {@link java.util.concurrent.CancellationException}.
1.135 + *
1.136 + * <p>The ForkJoinTask class is not usually directly subclassed.
1.137 + * Instead, you subclass one of the abstract classes that support a
1.138 + * particular style of fork/join processing, typically {@link
1.139 + * RecursiveAction} for computations that do not return results, or
1.140 + * {@link RecursiveTask} for those that do. Normally, a concrete
1.141 + * ForkJoinTask subclass declares fields comprising its parameters,
1.142 + * established in a constructor, and then defines a {@code compute}
1.143 + * method that somehow uses the control methods supplied by this base
1.144 + * class. While these methods have {@code public} access (to allow
1.145 + * instances of different task subclasses to call each other's
1.146 + * methods), some of them may only be called from within other
1.147 + * ForkJoinTasks (as may be determined using method {@link
1.148 + * #inForkJoinPool}). Attempts to invoke them in other contexts
1.149 + * result in exceptions or errors, possibly including
1.150 + * {@code ClassCastException}.
1.151 + *
1.152 + * <p>Method {@link #join} and its variants are appropriate for use
1.153 + * only when completion dependencies are acyclic; that is, the
1.154 + * parallel computation can be described as a directed acyclic graph
1.155 + * (DAG). Otherwise, executions may encounter a form of deadlock as
1.156 + * tasks cyclically wait for each other. However, this framework
1.157 + * supports other methods and techniques (for example the use of
1.158 + * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
1.159 + * may be of use in constructing custom subclasses for problems that
1.160 + * are not statically structured as DAGs.
1.161 + *
1.162 + * <p>Most base support methods are {@code final}, to prevent
1.163 + * overriding of implementations that are intrinsically tied to the
1.164 + * underlying lightweight task scheduling framework. Developers
1.165 + * creating new basic styles of fork/join processing should minimally
1.166 + * implement {@code protected} methods {@link #exec}, {@link
1.167 + * #setRawResult}, and {@link #getRawResult}, while also introducing
1.168 + * an abstract computational method that can be implemented in its
1.169 + * subclasses, possibly relying on other {@code protected} methods
1.170 + * provided by this class.
1.171 + *
1.172 + * <p>ForkJoinTasks should perform relatively small amounts of
1.173 + * computation. Large tasks should be split into smaller subtasks,
1.174 + * usually via recursive decomposition. As a very rough rule of thumb,
1.175 + * a task should perform more than 100 and less than 10000 basic
1.176 + * computational steps, and should avoid indefinite looping. If tasks
1.177 + * are too big, then parallelism cannot improve throughput. If too
1.178 + * small, then memory and internal task maintenance overhead may
1.179 + * overwhelm processing.
1.180 + *
1.181 + * <p>This class provides {@code adapt} methods for {@link Runnable}
1.182 + * and {@link Callable}, that may be of use when mixing execution of
1.183 + * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
1.184 + * of this form, consider using a pool constructed in <em>asyncMode</em>.
1.185 + *
1.186 + * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
1.187 + * used in extensions such as remote execution frameworks. It is
1.188 + * sensible to serialize tasks only before or after, but not during,
1.189 + * execution. Serialization is not relied on during execution itself.
1.190 + *
1.191 + * @since 1.7
1.192 + * @author Doug Lea
1.193 + */
1.194 +public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
1.195 +
1.196 + /*
1.197 + * See the internal documentation of class ForkJoinPool for a
1.198 + * general implementation overview. ForkJoinTasks are mainly
1.199 + * responsible for maintaining their "status" field amidst relays
1.200 + * to methods in ForkJoinWorkerThread and ForkJoinPool. The
1.201 + * methods of this class are more-or-less layered into (1) basic
1.202 + * status maintenance (2) execution and awaiting completion (3)
1.203 + * user-level methods that additionally report results. This is
1.204 + * sometimes hard to see because this file orders exported methods
1.205 + * in a way that flows well in javadocs.
1.206 + */
1.207 +
1.208 + /*
1.209 + * The status field holds run control status bits packed into a
1.210 + * single int to minimize footprint and to ensure atomicity (via
1.211 + * CAS). Status is initially zero, and takes on nonnegative
1.212 + * values until completed, upon which status holds value
1.213 + * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
1.214 + * waits by other threads have the SIGNAL bit set. Completion of
1.215 + * a stolen task with SIGNAL set awakens any waiters via
1.216 + * notifyAll. Even though suboptimal for some purposes, we use
1.217 + * basic builtin wait/notify to take advantage of "monitor
1.218 + * inflation" in JVMs that we would otherwise need to emulate to
1.219 + * avoid adding further per-task bookkeeping overhead. We want
1.220 + * these monitors to be "fat", i.e., not use biasing or thin-lock
1.221 + * techniques, so use some odd coding idioms that tend to avoid
1.222 + * them.
1.223 + */
1.224 +
1.225 + /** The run status of this task */
1.226 + volatile int status; // accessed directly by pool and workers
1.227 + private static final int NORMAL = -1;
1.228 + private static final int CANCELLED = -2;
1.229 + private static final int EXCEPTIONAL = -3;
1.230 + private static final int SIGNAL = 1;
1.231 +
1.232 + /**
1.233 + * Marks completion and wakes up threads waiting to join this task,
1.234 + * also clearing signal request bits.
1.235 + *
1.236 + * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
1.237 + * @return completion status on exit
1.238 + */
1.239 + private int setCompletion(int completion) {
1.240 + for (int s;;) {
1.241 + if ((s = status) < 0)
1.242 + return s;
1.243 + if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
1.244 + if (s != 0)
1.245 + synchronized (this) { notifyAll(); }
1.246 + return completion;
1.247 + }
1.248 + }
1.249 + }
1.250 +
1.251 + /**
1.252 + * Tries to block a worker thread until completed or timed out.
1.253 + * Uses Object.wait time argument conventions.
1.254 + * May fail on contention or interrupt.
1.255 + *
1.256 + * @param millis if > 0, wait time.
1.257 + */
1.258 + final void tryAwaitDone(long millis) {
1.259 + int s;
1.260 + try {
1.261 + if (((s = status) > 0 ||
1.262 + (s == 0 &&
1.263 + UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
1.264 + status > 0) {
1.265 + synchronized (this) {
1.266 + if (status > 0)
1.267 + wait(millis);
1.268 + }
1.269 + }
1.270 + } catch (InterruptedException ie) {
1.271 + // caller must check termination
1.272 + }
1.273 + }
1.274 +
1.275 + /**
1.276 + * Blocks a non-worker-thread until completion.
1.277 + * @return status upon completion
1.278 + */
1.279 + private int externalAwaitDone() {
1.280 + int s;
1.281 + if ((s = status) >= 0) {
1.282 + boolean interrupted = false;
1.283 + synchronized (this) {
1.284 + while ((s = status) >= 0) {
1.285 + if (s == 0)
1.286 + UNSAFE.compareAndSwapInt(this, statusOffset,
1.287 + 0, SIGNAL);
1.288 + else {
1.289 + try {
1.290 + wait();
1.291 + } catch (InterruptedException ie) {
1.292 + interrupted = true;
1.293 + }
1.294 + }
1.295 + }
1.296 + }
1.297 + if (interrupted)
1.298 + Thread.currentThread().interrupt();
1.299 + }
1.300 + return s;
1.301 + }
1.302 +
1.303 + /**
1.304 + * Blocks a non-worker-thread until completion or interruption or timeout.
1.305 + */
1.306 + private int externalInterruptibleAwaitDone(long millis)
1.307 + throws InterruptedException {
1.308 + int s;
1.309 + if (Thread.interrupted())
1.310 + throw new InterruptedException();
1.311 + if ((s = status) >= 0) {
1.312 + synchronized (this) {
1.313 + while ((s = status) >= 0) {
1.314 + if (s == 0)
1.315 + UNSAFE.compareAndSwapInt(this, statusOffset,
1.316 + 0, SIGNAL);
1.317 + else {
1.318 + wait(millis);
1.319 + if (millis > 0L)
1.320 + break;
1.321 + }
1.322 + }
1.323 + }
1.324 + }
1.325 + return s;
1.326 + }
1.327 +
1.328 + /**
1.329 + * Primary execution method for stolen tasks. Unless done, calls
1.330 + * exec and records status if completed, but doesn't wait for
1.331 + * completion otherwise.
1.332 + */
1.333 + final void doExec() {
1.334 + if (status >= 0) {
1.335 + boolean completed;
1.336 + try {
1.337 + completed = exec();
1.338 + } catch (Throwable rex) {
1.339 + setExceptionalCompletion(rex);
1.340 + return;
1.341 + }
1.342 + if (completed)
1.343 + setCompletion(NORMAL); // must be outside try block
1.344 + }
1.345 + }
1.346 +
1.347 + /**
1.348 + * Primary mechanics for join, get, quietlyJoin.
1.349 + * @return status upon completion
1.350 + */
1.351 + private int doJoin() {
1.352 + Thread t; ForkJoinWorkerThread w; int s; boolean completed;
1.353 + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
1.354 + if ((s = status) < 0)
1.355 + return s;
1.356 + if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
1.357 + try {
1.358 + completed = exec();
1.359 + } catch (Throwable rex) {
1.360 + return setExceptionalCompletion(rex);
1.361 + }
1.362 + if (completed)
1.363 + return setCompletion(NORMAL);
1.364 + }
1.365 + return w.joinTask(this);
1.366 + }
1.367 + else
1.368 + return externalAwaitDone();
1.369 + }
1.370 +
1.371 + /**
1.372 + * Primary mechanics for invoke, quietlyInvoke.
1.373 + * @return status upon completion
1.374 + */
1.375 + private int doInvoke() {
1.376 + int s; boolean completed;
1.377 + if ((s = status) < 0)
1.378 + return s;
1.379 + try {
1.380 + completed = exec();
1.381 + } catch (Throwable rex) {
1.382 + return setExceptionalCompletion(rex);
1.383 + }
1.384 + if (completed)
1.385 + return setCompletion(NORMAL);
1.386 + else
1.387 + return doJoin();
1.388 + }
1.389 +
1.390 + // Exception table support
1.391 +
1.392 + /**
1.393 + * Table of exceptions thrown by tasks, to enable reporting by
1.394 + * callers. Because exceptions are rare, we don't directly keep
1.395 + * them with task objects, but instead use a weak ref table. Note
1.396 + * that cancellation exceptions don't appear in the table, but are
1.397 + * instead recorded as status values.
1.398 + *
1.399 + * Note: These statics are initialized below in static block.
1.400 + */
1.401 + private static final ExceptionNode[] exceptionTable;
1.402 + private static final ReentrantLock exceptionTableLock;
1.403 + private static final ReferenceQueue<Object> exceptionTableRefQueue;
1.404 +
1.405 + /**
1.406 + * Fixed capacity for exceptionTable.
1.407 + */
1.408 + private static final int EXCEPTION_MAP_CAPACITY = 32;
1.409 +
1.410 + /**
1.411 + * Key-value nodes for exception table. The chained hash table
1.412 + * uses identity comparisons, full locking, and weak references
1.413 + * for keys. The table has a fixed capacity because it only
1.414 + * maintains task exceptions long enough for joiners to access
1.415 + * them, so should never become very large for sustained
1.416 + * periods. However, since we do not know when the last joiner
1.417 + * completes, we must use weak references and expunge them. We do
1.418 + * so on each operation (hence full locking). Also, some thread in
1.419 + * any ForkJoinPool will call helpExpungeStaleExceptions when its
1.420 + * pool becomes isQuiescent.
1.421 + */
1.422 + static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>{
1.423 + final Throwable ex;
1.424 + ExceptionNode next;
1.425 + final long thrower; // use id not ref to avoid weak cycles
1.426 + ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
1.427 + super(task, exceptionTableRefQueue);
1.428 + this.ex = ex;
1.429 + this.next = next;
1.430 + this.thrower = Thread.currentThread().getId();
1.431 + }
1.432 + }
1.433 +
1.434 + /**
1.435 + * Records exception and sets exceptional completion.
1.436 + *
1.437 + * @return status on exit
1.438 + */
1.439 + private int setExceptionalCompletion(Throwable ex) {
1.440 + int h = System.identityHashCode(this);
1.441 + final ReentrantLock lock = exceptionTableLock;
1.442 + lock.lock();
1.443 + try {
1.444 + expungeStaleExceptions();
1.445 + ExceptionNode[] t = exceptionTable;
1.446 + int i = h & (t.length - 1);
1.447 + for (ExceptionNode e = t[i]; ; e = e.next) {
1.448 + if (e == null) {
1.449 + t[i] = new ExceptionNode(this, ex, t[i]);
1.450 + break;
1.451 + }
1.452 + if (e.get() == this) // already present
1.453 + break;
1.454 + }
1.455 + } finally {
1.456 + lock.unlock();
1.457 + }
1.458 + return setCompletion(EXCEPTIONAL);
1.459 + }
1.460 +
1.461 + /**
1.462 + * Removes exception node and clears status
1.463 + */
1.464 + private void clearExceptionalCompletion() {
1.465 + int h = System.identityHashCode(this);
1.466 + final ReentrantLock lock = exceptionTableLock;
1.467 + lock.lock();
1.468 + try {
1.469 + ExceptionNode[] t = exceptionTable;
1.470 + int i = h & (t.length - 1);
1.471 + ExceptionNode e = t[i];
1.472 + ExceptionNode pred = null;
1.473 + while (e != null) {
1.474 + ExceptionNode next = e.next;
1.475 + if (e.get() == this) {
1.476 + if (pred == null)
1.477 + t[i] = next;
1.478 + else
1.479 + pred.next = next;
1.480 + break;
1.481 + }
1.482 + pred = e;
1.483 + e = next;
1.484 + }
1.485 + expungeStaleExceptions();
1.486 + status = 0;
1.487 + } finally {
1.488 + lock.unlock();
1.489 + }
1.490 + }
1.491 +
1.492 + /**
1.493 + * Returns a rethrowable exception for the given task, if
1.494 + * available. To provide accurate stack traces, if the exception
1.495 + * was not thrown by the current thread, we try to create a new
1.496 + * exception of the same type as the one thrown, but with the
1.497 + * recorded exception as its cause. If there is no such
1.498 + * constructor, we instead try to use a no-arg constructor,
1.499 + * followed by initCause, to the same effect. If none of these
1.500 + * apply, or any fail due to other exceptions, we return the
1.501 + * recorded exception, which is still correct, although it may
1.502 + * contain a misleading stack trace.
1.503 + *
1.504 + * @return the exception, or null if none
1.505 + */
1.506 + private Throwable getThrowableException() {
1.507 + if (status != EXCEPTIONAL)
1.508 + return null;
1.509 + int h = System.identityHashCode(this);
1.510 + ExceptionNode e;
1.511 + final ReentrantLock lock = exceptionTableLock;
1.512 + lock.lock();
1.513 + try {
1.514 + expungeStaleExceptions();
1.515 + ExceptionNode[] t = exceptionTable;
1.516 + e = t[h & (t.length - 1)];
1.517 + while (e != null && e.get() != this)
1.518 + e = e.next;
1.519 + } finally {
1.520 + lock.unlock();
1.521 + }
1.522 + Throwable ex;
1.523 + if (e == null || (ex = e.ex) == null)
1.524 + return null;
1.525 + if (e.thrower != Thread.currentThread().getId()) {
1.526 + Class ec = ex.getClass();
1.527 + try {
1.528 + Constructor<?> noArgCtor = null;
1.529 + Constructor<?>[] cs = ec.getConstructors();// public ctors only
1.530 + for (int i = 0; i < cs.length; ++i) {
1.531 + Constructor<?> c = cs[i];
1.532 + Class<?>[] ps = c.getParameterTypes();
1.533 + if (ps.length == 0)
1.534 + noArgCtor = c;
1.535 + else if (ps.length == 1 && ps[0] == Throwable.class)
1.536 + return (Throwable)(c.newInstance(ex));
1.537 + }
1.538 + if (noArgCtor != null) {
1.539 + Throwable wx = (Throwable)(noArgCtor.newInstance());
1.540 + wx.initCause(ex);
1.541 + return wx;
1.542 + }
1.543 + } catch (Exception ignore) {
1.544 + }
1.545 + }
1.546 + return ex;
1.547 + }
1.548 +
1.549 + /**
1.550 + * Poll stale refs and remove them. Call only while holding lock.
1.551 + */
1.552 + private static void expungeStaleExceptions() {
1.553 + for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
1.554 + if (x instanceof ExceptionNode) {
1.555 + ForkJoinTask<?> key = ((ExceptionNode)x).get();
1.556 + ExceptionNode[] t = exceptionTable;
1.557 + int i = System.identityHashCode(key) & (t.length - 1);
1.558 + ExceptionNode e = t[i];
1.559 + ExceptionNode pred = null;
1.560 + while (e != null) {
1.561 + ExceptionNode next = e.next;
1.562 + if (e == x) {
1.563 + if (pred == null)
1.564 + t[i] = next;
1.565 + else
1.566 + pred.next = next;
1.567 + break;
1.568 + }
1.569 + pred = e;
1.570 + e = next;
1.571 + }
1.572 + }
1.573 + }
1.574 + }
1.575 +
1.576 + /**
1.577 + * If lock is available, poll stale refs and remove them.
1.578 + * Called from ForkJoinPool when pools become quiescent.
1.579 + */
1.580 + static final void helpExpungeStaleExceptions() {
1.581 + final ReentrantLock lock = exceptionTableLock;
1.582 + if (lock.tryLock()) {
1.583 + try {
1.584 + expungeStaleExceptions();
1.585 + } finally {
1.586 + lock.unlock();
1.587 + }
1.588 + }
1.589 + }
1.590 +
1.591 + /**
1.592 + * Report the result of invoke or join; called only upon
1.593 + * non-normal return of internal versions.
1.594 + */
1.595 + private V reportResult() {
1.596 + int s; Throwable ex;
1.597 + if ((s = status) == CANCELLED)
1.598 + throw new CancellationException();
1.599 + if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
1.600 + UNSAFE.throwException(ex);
1.601 + return getRawResult();
1.602 + }
1.603 +
1.604 + // public methods
1.605 +
1.606 + /**
1.607 + * Arranges to asynchronously execute this task. While it is not
1.608 + * necessarily enforced, it is a usage error to fork a task more
1.609 + * than once unless it has completed and been reinitialized.
1.610 + * Subsequent modifications to the state of this task or any data
1.611 + * it operates on are not necessarily consistently observable by
1.612 + * any thread other than the one executing it unless preceded by a
1.613 + * call to {@link #join} or related methods, or a call to {@link
1.614 + * #isDone} returning {@code true}.
1.615 + *
1.616 + * <p>This method may be invoked only from within {@code
1.617 + * ForkJoinPool} computations (as may be determined using method
1.618 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.619 + * result in exceptions or errors, possibly including {@code
1.620 + * ClassCastException}.
1.621 + *
1.622 + * @return {@code this}, to simplify usage
1.623 + */
1.624 + public final ForkJoinTask<V> fork() {
1.625 + ((ForkJoinWorkerThread) Thread.currentThread())
1.626 + .pushTask(this);
1.627 + return this;
1.628 + }
1.629 +
1.630 + /**
1.631 + * Returns the result of the computation when it {@link #isDone is
1.632 + * done}. This method differs from {@link #get()} in that
1.633 + * abnormal completion results in {@code RuntimeException} or
1.634 + * {@code Error}, not {@code ExecutionException}, and that
1.635 + * interrupts of the calling thread do <em>not</em> cause the
1.636 + * method to abruptly return by throwing {@code
1.637 + * InterruptedException}.
1.638 + *
1.639 + * @return the computed result
1.640 + */
1.641 + public final V join() {
1.642 + if (doJoin() != NORMAL)
1.643 + return reportResult();
1.644 + else
1.645 + return getRawResult();
1.646 + }
1.647 +
1.648 + /**
1.649 + * Commences performing this task, awaits its completion if
1.650 + * necessary, and returns its result, or throws an (unchecked)
1.651 + * {@code RuntimeException} or {@code Error} if the underlying
1.652 + * computation did so.
1.653 + *
1.654 + * @return the computed result
1.655 + */
1.656 + public final V invoke() {
1.657 + if (doInvoke() != NORMAL)
1.658 + return reportResult();
1.659 + else
1.660 + return getRawResult();
1.661 + }
1.662 +
1.663 + /**
1.664 + * Forks the given tasks, returning when {@code isDone} holds for
1.665 + * each task or an (unchecked) exception is encountered, in which
1.666 + * case the exception is rethrown. If more than one task
1.667 + * encounters an exception, then this method throws any one of
1.668 + * these exceptions. If any task encounters an exception, the
1.669 + * other may be cancelled. However, the execution status of
1.670 + * individual tasks is not guaranteed upon exceptional return. The
1.671 + * status of each task may be obtained using {@link
1.672 + * #getException()} and related methods to check if they have been
1.673 + * cancelled, completed normally or exceptionally, or left
1.674 + * unprocessed.
1.675 + *
1.676 + * <p>This method may be invoked only from within {@code
1.677 + * ForkJoinPool} computations (as may be determined using method
1.678 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.679 + * result in exceptions or errors, possibly including {@code
1.680 + * ClassCastException}.
1.681 + *
1.682 + * @param t1 the first task
1.683 + * @param t2 the second task
1.684 + * @throws NullPointerException if any task is null
1.685 + */
1.686 + public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
1.687 + t2.fork();
1.688 + t1.invoke();
1.689 + t2.join();
1.690 + }
1.691 +
1.692 + /**
1.693 + * Forks the given tasks, returning when {@code isDone} holds for
1.694 + * each task or an (unchecked) exception is encountered, in which
1.695 + * case the exception is rethrown. If more than one task
1.696 + * encounters an exception, then this method throws any one of
1.697 + * these exceptions. If any task encounters an exception, others
1.698 + * may be cancelled. However, the execution status of individual
1.699 + * tasks is not guaranteed upon exceptional return. The status of
1.700 + * each task may be obtained using {@link #getException()} and
1.701 + * related methods to check if they have been cancelled, completed
1.702 + * normally or exceptionally, or left unprocessed.
1.703 + *
1.704 + * <p>This method may be invoked only from within {@code
1.705 + * ForkJoinPool} computations (as may be determined using method
1.706 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.707 + * result in exceptions or errors, possibly including {@code
1.708 + * ClassCastException}.
1.709 + *
1.710 + * @param tasks the tasks
1.711 + * @throws NullPointerException if any task is null
1.712 + */
1.713 + public static void invokeAll(ForkJoinTask<?>... tasks) {
1.714 + Throwable ex = null;
1.715 + int last = tasks.length - 1;
1.716 + for (int i = last; i >= 0; --i) {
1.717 + ForkJoinTask<?> t = tasks[i];
1.718 + if (t == null) {
1.719 + if (ex == null)
1.720 + ex = new NullPointerException();
1.721 + }
1.722 + else if (i != 0)
1.723 + t.fork();
1.724 + else if (t.doInvoke() < NORMAL && ex == null)
1.725 + ex = t.getException();
1.726 + }
1.727 + for (int i = 1; i <= last; ++i) {
1.728 + ForkJoinTask<?> t = tasks[i];
1.729 + if (t != null) {
1.730 + if (ex != null)
1.731 + t.cancel(false);
1.732 + else if (t.doJoin() < NORMAL && ex == null)
1.733 + ex = t.getException();
1.734 + }
1.735 + }
1.736 + if (ex != null)
1.737 + UNSAFE.throwException(ex);
1.738 + }
1.739 +
1.740 + /**
1.741 + * Forks all tasks in the specified collection, returning when
1.742 + * {@code isDone} holds for each task or an (unchecked) exception
1.743 + * is encountered, in which case the exception is rethrown. If
1.744 + * more than one task encounters an exception, then this method
1.745 + * throws any one of these exceptions. If any task encounters an
1.746 + * exception, others may be cancelled. However, the execution
1.747 + * status of individual tasks is not guaranteed upon exceptional
1.748 + * return. The status of each task may be obtained using {@link
1.749 + * #getException()} and related methods to check if they have been
1.750 + * cancelled, completed normally or exceptionally, or left
1.751 + * unprocessed.
1.752 + *
1.753 + * <p>This method may be invoked only from within {@code
1.754 + * ForkJoinPool} computations (as may be determined using method
1.755 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.756 + * result in exceptions or errors, possibly including {@code
1.757 + * ClassCastException}.
1.758 + *
1.759 + * @param tasks the collection of tasks
1.760 + * @return the tasks argument, to simplify usage
1.761 + * @throws NullPointerException if tasks or any element are null
1.762 + */
1.763 + public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
1.764 + if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
1.765 + invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
1.766 + return tasks;
1.767 + }
1.768 + @SuppressWarnings("unchecked")
1.769 + List<? extends ForkJoinTask<?>> ts =
1.770 + (List<? extends ForkJoinTask<?>>) tasks;
1.771 + Throwable ex = null;
1.772 + int last = ts.size() - 1;
1.773 + for (int i = last; i >= 0; --i) {
1.774 + ForkJoinTask<?> t = ts.get(i);
1.775 + if (t == null) {
1.776 + if (ex == null)
1.777 + ex = new NullPointerException();
1.778 + }
1.779 + else if (i != 0)
1.780 + t.fork();
1.781 + else if (t.doInvoke() < NORMAL && ex == null)
1.782 + ex = t.getException();
1.783 + }
1.784 + for (int i = 1; i <= last; ++i) {
1.785 + ForkJoinTask<?> t = ts.get(i);
1.786 + if (t != null) {
1.787 + if (ex != null)
1.788 + t.cancel(false);
1.789 + else if (t.doJoin() < NORMAL && ex == null)
1.790 + ex = t.getException();
1.791 + }
1.792 + }
1.793 + if (ex != null)
1.794 + UNSAFE.throwException(ex);
1.795 + return tasks;
1.796 + }
1.797 +
1.798 + /**
1.799 + * Attempts to cancel execution of this task. This attempt will
1.800 + * fail if the task has already completed or could not be
1.801 + * cancelled for some other reason. If successful, and this task
1.802 + * has not started when {@code cancel} is called, execution of
1.803 + * this task is suppressed. After this method returns
1.804 + * successfully, unless there is an intervening call to {@link
1.805 + * #reinitialize}, subsequent calls to {@link #isCancelled},
1.806 + * {@link #isDone}, and {@code cancel} will return {@code true}
1.807 + * and calls to {@link #join} and related methods will result in
1.808 + * {@code CancellationException}.
1.809 + *
1.810 + * <p>This method may be overridden in subclasses, but if so, must
1.811 + * still ensure that these properties hold. In particular, the
1.812 + * {@code cancel} method itself must not throw exceptions.
1.813 + *
1.814 + * <p>This method is designed to be invoked by <em>other</em>
1.815 + * tasks. To terminate the current task, you can just return or
1.816 + * throw an unchecked exception from its computation method, or
1.817 + * invoke {@link #completeExceptionally}.
1.818 + *
1.819 + * @param mayInterruptIfRunning this value has no effect in the
1.820 + * default implementation because interrupts are not used to
1.821 + * control cancellation.
1.822 + *
1.823 + * @return {@code true} if this task is now cancelled
1.824 + */
1.825 + public boolean cancel(boolean mayInterruptIfRunning) {
1.826 + return setCompletion(CANCELLED) == CANCELLED;
1.827 + }
1.828 +
1.829 + /**
1.830 + * Cancels, ignoring any exceptions thrown by cancel. Used during
1.831 + * worker and pool shutdown. Cancel is spec'ed not to throw any
1.832 + * exceptions, but if it does anyway, we have no recourse during
1.833 + * shutdown, so guard against this case.
1.834 + */
1.835 + final void cancelIgnoringExceptions() {
1.836 + try {
1.837 + cancel(false);
1.838 + } catch (Throwable ignore) {
1.839 + }
1.840 + }
1.841 +
1.842 + public final boolean isDone() {
1.843 + return status < 0;
1.844 + }
1.845 +
1.846 + public final boolean isCancelled() {
1.847 + return status == CANCELLED;
1.848 + }
1.849 +
1.850 + /**
1.851 + * Returns {@code true} if this task threw an exception or was cancelled.
1.852 + *
1.853 + * @return {@code true} if this task threw an exception or was cancelled
1.854 + */
1.855 + public final boolean isCompletedAbnormally() {
1.856 + return status < NORMAL;
1.857 + }
1.858 +
1.859 + /**
1.860 + * Returns {@code true} if this task completed without throwing an
1.861 + * exception and was not cancelled.
1.862 + *
1.863 + * @return {@code true} if this task completed without throwing an
1.864 + * exception and was not cancelled
1.865 + */
1.866 + public final boolean isCompletedNormally() {
1.867 + return status == NORMAL;
1.868 + }
1.869 +
1.870 + /**
1.871 + * Returns the exception thrown by the base computation, or a
1.872 + * {@code CancellationException} if cancelled, or {@code null} if
1.873 + * none or if the method has not yet completed.
1.874 + *
1.875 + * @return the exception, or {@code null} if none
1.876 + */
1.877 + public final Throwable getException() {
1.878 + int s = status;
1.879 + return ((s >= NORMAL) ? null :
1.880 + (s == CANCELLED) ? new CancellationException() :
1.881 + getThrowableException());
1.882 + }
1.883 +
1.884 + /**
1.885 + * Completes this task abnormally, and if not already aborted or
1.886 + * cancelled, causes it to throw the given exception upon
1.887 + * {@code join} and related operations. This method may be used
1.888 + * to induce exceptions in asynchronous tasks, or to force
1.889 + * completion of tasks that would not otherwise complete. Its use
1.890 + * in other situations is discouraged. This method is
1.891 + * overridable, but overridden versions must invoke {@code super}
1.892 + * implementation to maintain guarantees.
1.893 + *
1.894 + * @param ex the exception to throw. If this exception is not a
1.895 + * {@code RuntimeException} or {@code Error}, the actual exception
1.896 + * thrown will be a {@code RuntimeException} with cause {@code ex}.
1.897 + */
1.898 + public void completeExceptionally(Throwable ex) {
1.899 + setExceptionalCompletion((ex instanceof RuntimeException) ||
1.900 + (ex instanceof Error) ? ex :
1.901 + new RuntimeException(ex));
1.902 + }
1.903 +
1.904 + /**
1.905 + * Completes this task, and if not already aborted or cancelled,
1.906 + * returning the given value as the result of subsequent
1.907 + * invocations of {@code join} and related operations. This method
1.908 + * may be used to provide results for asynchronous tasks, or to
1.909 + * provide alternative handling for tasks that would not otherwise
1.910 + * complete normally. Its use in other situations is
1.911 + * discouraged. This method is overridable, but overridden
1.912 + * versions must invoke {@code super} implementation to maintain
1.913 + * guarantees.
1.914 + *
1.915 + * @param value the result value for this task
1.916 + */
1.917 + public void complete(V value) {
1.918 + try {
1.919 + setRawResult(value);
1.920 + } catch (Throwable rex) {
1.921 + setExceptionalCompletion(rex);
1.922 + return;
1.923 + }
1.924 + setCompletion(NORMAL);
1.925 + }
1.926 +
1.927 + /**
1.928 + * Waits if necessary for the computation to complete, and then
1.929 + * retrieves its result.
1.930 + *
1.931 + * @return the computed result
1.932 + * @throws CancellationException if the computation was cancelled
1.933 + * @throws ExecutionException if the computation threw an
1.934 + * exception
1.935 + * @throws InterruptedException if the current thread is not a
1.936 + * member of a ForkJoinPool and was interrupted while waiting
1.937 + */
1.938 + public final V get() throws InterruptedException, ExecutionException {
1.939 + int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
1.940 + doJoin() : externalInterruptibleAwaitDone(0L);
1.941 + Throwable ex;
1.942 + if (s == CANCELLED)
1.943 + throw new CancellationException();
1.944 + if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
1.945 + throw new ExecutionException(ex);
1.946 + return getRawResult();
1.947 + }
1.948 +
1.949 + /**
1.950 + * Waits if necessary for at most the given time for the computation
1.951 + * to complete, and then retrieves its result, if available.
1.952 + *
1.953 + * @param timeout the maximum time to wait
1.954 + * @param unit the time unit of the timeout argument
1.955 + * @return the computed result
1.956 + * @throws CancellationException if the computation was cancelled
1.957 + * @throws ExecutionException if the computation threw an
1.958 + * exception
1.959 + * @throws InterruptedException if the current thread is not a
1.960 + * member of a ForkJoinPool and was interrupted while waiting
1.961 + * @throws TimeoutException if the wait timed out
1.962 + */
1.963 + public final V get(long timeout, TimeUnit unit)
1.964 + throws InterruptedException, ExecutionException, TimeoutException {
1.965 + Thread t = Thread.currentThread();
1.966 + if (t instanceof ForkJoinWorkerThread) {
1.967 + ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
1.968 + long nanos = unit.toNanos(timeout);
1.969 + if (status >= 0) {
1.970 + boolean completed = false;
1.971 + if (w.unpushTask(this)) {
1.972 + try {
1.973 + completed = exec();
1.974 + } catch (Throwable rex) {
1.975 + setExceptionalCompletion(rex);
1.976 + }
1.977 + }
1.978 + if (completed)
1.979 + setCompletion(NORMAL);
1.980 + else if (status >= 0 && nanos > 0)
1.981 + w.pool.timedAwaitJoin(this, nanos);
1.982 + }
1.983 + }
1.984 + else {
1.985 + long millis = unit.toMillis(timeout);
1.986 + if (millis > 0)
1.987 + externalInterruptibleAwaitDone(millis);
1.988 + }
1.989 + int s = status;
1.990 + if (s != NORMAL) {
1.991 + Throwable ex;
1.992 + if (s == CANCELLED)
1.993 + throw new CancellationException();
1.994 + if (s != EXCEPTIONAL)
1.995 + throw new TimeoutException();
1.996 + if ((ex = getThrowableException()) != null)
1.997 + throw new ExecutionException(ex);
1.998 + }
1.999 + return getRawResult();
1.1000 + }
1.1001 +
1.1002 + /**
1.1003 + * Joins this task, without returning its result or throwing its
1.1004 + * exception. This method may be useful when processing
1.1005 + * collections of tasks when some have been cancelled or otherwise
1.1006 + * known to have aborted.
1.1007 + */
1.1008 + public final void quietlyJoin() {
1.1009 + doJoin();
1.1010 + }
1.1011 +
1.1012 + /**
1.1013 + * Commences performing this task and awaits its completion if
1.1014 + * necessary, without returning its result or throwing its
1.1015 + * exception.
1.1016 + */
1.1017 + public final void quietlyInvoke() {
1.1018 + doInvoke();
1.1019 + }
1.1020 +
1.1021 + /**
1.1022 + * Possibly executes tasks until the pool hosting the current task
1.1023 + * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
1.1024 + * be of use in designs in which many tasks are forked, but none
1.1025 + * are explicitly joined, instead executing them until all are
1.1026 + * processed.
1.1027 + *
1.1028 + * <p>This method may be invoked only from within {@code
1.1029 + * ForkJoinPool} computations (as may be determined using method
1.1030 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1031 + * result in exceptions or errors, possibly including {@code
1.1032 + * ClassCastException}.
1.1033 + */
1.1034 + public static void helpQuiesce() {
1.1035 + ((ForkJoinWorkerThread) Thread.currentThread())
1.1036 + .helpQuiescePool();
1.1037 + }
1.1038 +
1.1039 + /**
1.1040 + * Resets the internal bookkeeping state of this task, allowing a
1.1041 + * subsequent {@code fork}. This method allows repeated reuse of
1.1042 + * this task, but only if reuse occurs when this task has either
1.1043 + * never been forked, or has been forked, then completed and all
1.1044 + * outstanding joins of this task have also completed. Effects
1.1045 + * under any other usage conditions are not guaranteed.
1.1046 + * This method may be useful when executing
1.1047 + * pre-constructed trees of subtasks in loops.
1.1048 + *
1.1049 + * <p>Upon completion of this method, {@code isDone()} reports
1.1050 + * {@code false}, and {@code getException()} reports {@code
1.1051 + * null}. However, the value returned by {@code getRawResult} is
1.1052 + * unaffected. To clear this value, you can invoke {@code
1.1053 + * setRawResult(null)}.
1.1054 + */
1.1055 + public void reinitialize() {
1.1056 + if (status == EXCEPTIONAL)
1.1057 + clearExceptionalCompletion();
1.1058 + else
1.1059 + status = 0;
1.1060 + }
1.1061 +
1.1062 + /**
1.1063 + * Returns the pool hosting the current task execution, or null
1.1064 + * if this task is executing outside of any ForkJoinPool.
1.1065 + *
1.1066 + * @see #inForkJoinPool
1.1067 + * @return the pool, or {@code null} if none
1.1068 + */
1.1069 + public static ForkJoinPool getPool() {
1.1070 + Thread t = Thread.currentThread();
1.1071 + return (t instanceof ForkJoinWorkerThread) ?
1.1072 + ((ForkJoinWorkerThread) t).pool : null;
1.1073 + }
1.1074 +
1.1075 + /**
1.1076 + * Returns {@code true} if the current thread is a {@link
1.1077 + * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
1.1078 + *
1.1079 + * @return {@code true} if the current thread is a {@link
1.1080 + * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
1.1081 + * or {@code false} otherwise
1.1082 + */
1.1083 + public static boolean inForkJoinPool() {
1.1084 + return Thread.currentThread() instanceof ForkJoinWorkerThread;
1.1085 + }
1.1086 +
1.1087 + /**
1.1088 + * Tries to unschedule this task for execution. This method will
1.1089 + * typically succeed if this task is the most recently forked task
1.1090 + * by the current thread, and has not commenced executing in
1.1091 + * another thread. This method may be useful when arranging
1.1092 + * alternative local processing of tasks that could have been, but
1.1093 + * were not, stolen.
1.1094 + *
1.1095 + * <p>This method may be invoked only from within {@code
1.1096 + * ForkJoinPool} computations (as may be determined using method
1.1097 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1098 + * result in exceptions or errors, possibly including {@code
1.1099 + * ClassCastException}.
1.1100 + *
1.1101 + * @return {@code true} if unforked
1.1102 + */
1.1103 + public boolean tryUnfork() {
1.1104 + return ((ForkJoinWorkerThread) Thread.currentThread())
1.1105 + .unpushTask(this);
1.1106 + }
1.1107 +
1.1108 + /**
1.1109 + * Returns an estimate of the number of tasks that have been
1.1110 + * forked by the current worker thread but not yet executed. This
1.1111 + * value may be useful for heuristic decisions about whether to
1.1112 + * fork other tasks.
1.1113 + *
1.1114 + * <p>This method may be invoked only from within {@code
1.1115 + * ForkJoinPool} computations (as may be determined using method
1.1116 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1117 + * result in exceptions or errors, possibly including {@code
1.1118 + * ClassCastException}.
1.1119 + *
1.1120 + * @return the number of tasks
1.1121 + */
1.1122 + public static int getQueuedTaskCount() {
1.1123 + return ((ForkJoinWorkerThread) Thread.currentThread())
1.1124 + .getQueueSize();
1.1125 + }
1.1126 +
1.1127 + /**
1.1128 + * Returns an estimate of how many more locally queued tasks are
1.1129 + * held by the current worker thread than there are other worker
1.1130 + * threads that might steal them. This value may be useful for
1.1131 + * heuristic decisions about whether to fork other tasks. In many
1.1132 + * usages of ForkJoinTasks, at steady state, each worker should
1.1133 + * aim to maintain a small constant surplus (for example, 3) of
1.1134 + * tasks, and to process computations locally if this threshold is
1.1135 + * exceeded.
1.1136 + *
1.1137 + * <p>This method may be invoked only from within {@code
1.1138 + * ForkJoinPool} computations (as may be determined using method
1.1139 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1140 + * result in exceptions or errors, possibly including {@code
1.1141 + * ClassCastException}.
1.1142 + *
1.1143 + * @return the surplus number of tasks, which may be negative
1.1144 + */
1.1145 + public static int getSurplusQueuedTaskCount() {
1.1146 + return ((ForkJoinWorkerThread) Thread.currentThread())
1.1147 + .getEstimatedSurplusTaskCount();
1.1148 + }
1.1149 +
1.1150 + // Extension methods
1.1151 +
1.1152 + /**
1.1153 + * Returns the result that would be returned by {@link #join}, even
1.1154 + * if this task completed abnormally, or {@code null} if this task
1.1155 + * is not known to have been completed. This method is designed
1.1156 + * to aid debugging, as well as to support extensions. Its use in
1.1157 + * any other context is discouraged.
1.1158 + *
1.1159 + * @return the result, or {@code null} if not completed
1.1160 + */
1.1161 + public abstract V getRawResult();
1.1162 +
1.1163 + /**
1.1164 + * Forces the given value to be returned as a result. This method
1.1165 + * is designed to support extensions, and should not in general be
1.1166 + * called otherwise.
1.1167 + *
1.1168 + * @param value the value
1.1169 + */
1.1170 + protected abstract void setRawResult(V value);
1.1171 +
1.1172 + /**
1.1173 + * Immediately performs the base action of this task. This method
1.1174 + * is designed to support extensions, and should not in general be
1.1175 + * called otherwise. The return value controls whether this task
1.1176 + * is considered to be done normally. It may return false in
1.1177 + * asynchronous actions that require explicit invocations of
1.1178 + * {@link #complete} to become joinable. It may also throw an
1.1179 + * (unchecked) exception to indicate abnormal exit.
1.1180 + *
1.1181 + * @return {@code true} if completed normally
1.1182 + */
1.1183 + protected abstract boolean exec();
1.1184 +
1.1185 + /**
1.1186 + * Returns, but does not unschedule or execute, a task queued by
1.1187 + * the current thread but not yet executed, if one is immediately
1.1188 + * available. There is no guarantee that this task will actually
1.1189 + * be polled or executed next. Conversely, this method may return
1.1190 + * null even if a task exists but cannot be accessed without
1.1191 + * contention with other threads. This method is designed
1.1192 + * primarily to support extensions, and is unlikely to be useful
1.1193 + * otherwise.
1.1194 + *
1.1195 + * <p>This method may be invoked only from within {@code
1.1196 + * ForkJoinPool} computations (as may be determined using method
1.1197 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1198 + * result in exceptions or errors, possibly including {@code
1.1199 + * ClassCastException}.
1.1200 + *
1.1201 + * @return the next task, or {@code null} if none are available
1.1202 + */
1.1203 + protected static ForkJoinTask<?> peekNextLocalTask() {
1.1204 + return ((ForkJoinWorkerThread) Thread.currentThread())
1.1205 + .peekTask();
1.1206 + }
1.1207 +
1.1208 + /**
1.1209 + * Unschedules and returns, without executing, the next task
1.1210 + * queued by the current thread but not yet executed. This method
1.1211 + * is designed primarily to support extensions, and is unlikely to
1.1212 + * be useful otherwise.
1.1213 + *
1.1214 + * <p>This method may be invoked only from within {@code
1.1215 + * ForkJoinPool} computations (as may be determined using method
1.1216 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1217 + * result in exceptions or errors, possibly including {@code
1.1218 + * ClassCastException}.
1.1219 + *
1.1220 + * @return the next task, or {@code null} if none are available
1.1221 + */
1.1222 + protected static ForkJoinTask<?> pollNextLocalTask() {
1.1223 + return ((ForkJoinWorkerThread) Thread.currentThread())
1.1224 + .pollLocalTask();
1.1225 + }
1.1226 +
1.1227 + /**
1.1228 + * Unschedules and returns, without executing, the next task
1.1229 + * queued by the current thread but not yet executed, if one is
1.1230 + * available, or if not available, a task that was forked by some
1.1231 + * other thread, if available. Availability may be transient, so a
1.1232 + * {@code null} result does not necessarily imply quiescence
1.1233 + * of the pool this task is operating in. This method is designed
1.1234 + * primarily to support extensions, and is unlikely to be useful
1.1235 + * otherwise.
1.1236 + *
1.1237 + * <p>This method may be invoked only from within {@code
1.1238 + * ForkJoinPool} computations (as may be determined using method
1.1239 + * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1.1240 + * result in exceptions or errors, possibly including {@code
1.1241 + * ClassCastException}.
1.1242 + *
1.1243 + * @return a task, or {@code null} if none are available
1.1244 + */
1.1245 + protected static ForkJoinTask<?> pollTask() {
1.1246 + return ((ForkJoinWorkerThread) Thread.currentThread())
1.1247 + .pollTask();
1.1248 + }
1.1249 +
1.1250 + /**
1.1251 + * Adaptor for Runnables. This implements RunnableFuture
1.1252 + * to be compliant with AbstractExecutorService constraints
1.1253 + * when used in ForkJoinPool.
1.1254 + */
1.1255 + static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1.1256 + implements RunnableFuture<T> {
1.1257 + final Runnable runnable;
1.1258 + final T resultOnCompletion;
1.1259 + T result;
1.1260 + AdaptedRunnable(Runnable runnable, T result) {
1.1261 + if (runnable == null) throw new NullPointerException();
1.1262 + this.runnable = runnable;
1.1263 + this.resultOnCompletion = result;
1.1264 + }
1.1265 + public T getRawResult() { return result; }
1.1266 + public void setRawResult(T v) { result = v; }
1.1267 + public boolean exec() {
1.1268 + runnable.run();
1.1269 + result = resultOnCompletion;
1.1270 + return true;
1.1271 + }
1.1272 + public void run() { invoke(); }
1.1273 + private static final long serialVersionUID = 5232453952276885070L;
1.1274 + }
1.1275 +
1.1276 + /**
1.1277 + * Adaptor for Callables
1.1278 + */
1.1279 + static final class AdaptedCallable<T> extends ForkJoinTask<T>
1.1280 + implements RunnableFuture<T> {
1.1281 + final Callable<? extends T> callable;
1.1282 + T result;
1.1283 + AdaptedCallable(Callable<? extends T> callable) {
1.1284 + if (callable == null) throw new NullPointerException();
1.1285 + this.callable = callable;
1.1286 + }
1.1287 + public T getRawResult() { return result; }
1.1288 + public void setRawResult(T v) { result = v; }
1.1289 + public boolean exec() {
1.1290 + try {
1.1291 + result = callable.call();
1.1292 + return true;
1.1293 + } catch (Error err) {
1.1294 + throw err;
1.1295 + } catch (RuntimeException rex) {
1.1296 + throw rex;
1.1297 + } catch (Exception ex) {
1.1298 + throw new RuntimeException(ex);
1.1299 + }
1.1300 + }
1.1301 + public void run() { invoke(); }
1.1302 + private static final long serialVersionUID = 2838392045355241008L;
1.1303 + }
1.1304 +
1.1305 + /**
1.1306 + * Returns a new {@code ForkJoinTask} that performs the {@code run}
1.1307 + * method of the given {@code Runnable} as its action, and returns
1.1308 + * a null result upon {@link #join}.
1.1309 + *
1.1310 + * @param runnable the runnable action
1.1311 + * @return the task
1.1312 + */
1.1313 + public static ForkJoinTask<?> adapt(Runnable runnable) {
1.1314 + return new AdaptedRunnable<Void>(runnable, null);
1.1315 + }
1.1316 +
1.1317 + /**
1.1318 + * Returns a new {@code ForkJoinTask} that performs the {@code run}
1.1319 + * method of the given {@code Runnable} as its action, and returns
1.1320 + * the given result upon {@link #join}.
1.1321 + *
1.1322 + * @param runnable the runnable action
1.1323 + * @param result the result upon completion
1.1324 + * @return the task
1.1325 + */
1.1326 + public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1.1327 + return new AdaptedRunnable<T>(runnable, result);
1.1328 + }
1.1329 +
1.1330 + /**
1.1331 + * Returns a new {@code ForkJoinTask} that performs the {@code call}
1.1332 + * method of the given {@code Callable} as its action, and returns
1.1333 + * its result upon {@link #join}, translating any checked exceptions
1.1334 + * encountered into {@code RuntimeException}.
1.1335 + *
1.1336 + * @param callable the callable action
1.1337 + * @return the task
1.1338 + */
1.1339 + public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1.1340 + return new AdaptedCallable<T>(callable);
1.1341 + }
1.1342 +
1.1343 + // Serialization support
1.1344 +
1.1345 + private static final long serialVersionUID = -7721805057305804111L;
1.1346 +
1.1347 + /**
1.1348 + * Saves the state to a stream (that is, serializes it).
1.1349 + *
1.1350 + * @serialData the current run status and the exception thrown
1.1351 + * during execution, or {@code null} if none
1.1352 + * @param s the stream
1.1353 + */
1.1354 + private void writeObject(java.io.ObjectOutputStream s)
1.1355 + throws java.io.IOException {
1.1356 + s.defaultWriteObject();
1.1357 + s.writeObject(getException());
1.1358 + }
1.1359 +
1.1360 + /**
1.1361 + * Reconstitutes the instance from a stream (that is, deserializes it).
1.1362 + *
1.1363 + * @param s the stream
1.1364 + */
1.1365 + private void readObject(java.io.ObjectInputStream s)
1.1366 + throws java.io.IOException, ClassNotFoundException {
1.1367 + s.defaultReadObject();
1.1368 + Object ex = s.readObject();
1.1369 + if (ex != null)
1.1370 + setExceptionalCompletion((Throwable)ex);
1.1371 + }
1.1372 +
1.1373 + // Unsafe mechanics
1.1374 + private static final sun.misc.Unsafe UNSAFE;
1.1375 + private static final long statusOffset;
1.1376 + static {
1.1377 + exceptionTableLock = new ReentrantLock();
1.1378 + exceptionTableRefQueue = new ReferenceQueue<Object>();
1.1379 + exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
1.1380 + try {
1.1381 + UNSAFE = sun.misc.Unsafe.getUnsafe();
1.1382 + statusOffset = UNSAFE.objectFieldOffset
1.1383 + (ForkJoinTask.class.getDeclaredField("status"));
1.1384 + } catch (Exception e) {
1.1385 + throw new Error(e);
1.1386 + }
1.1387 + }
1.1388 +
1.1389 +}