rt/emul/compact/src/main/java/java/util/concurrent/FutureTask.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 import java.util.concurrent.locks.*;
    38 
    39 /**
    40  * A cancellable asynchronous computation.  This class provides a base
    41  * implementation of {@link Future}, with methods to start and cancel
    42  * a computation, query to see if the computation is complete, and
    43  * retrieve the result of the computation.  The result can only be
    44  * retrieved when the computation has completed; the <tt>get</tt>
    45  * method will block if the computation has not yet completed.  Once
    46  * the computation has completed, the computation cannot be restarted
    47  * or cancelled.
    48  *
    49  * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
    50  * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
    51  * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
    52  * submitted to an {@link Executor} for execution.
    53  *
    54  * <p>In addition to serving as a standalone class, this class provides
    55  * <tt>protected</tt> functionality that may be useful when creating
    56  * customized task classes.
    57  *
    58  * @since 1.5
    59  * @author Doug Lea
    60  * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
    61  */
    62 public class FutureTask<V> implements RunnableFuture<V> {
    63     /** Synchronization control for FutureTask */
    64     private final Sync sync;
    65 
    66     /**
    67      * Creates a <tt>FutureTask</tt> that will, upon running, execute the
    68      * given <tt>Callable</tt>.
    69      *
    70      * @param  callable the callable task
    71      * @throws NullPointerException if callable is null
    72      */
    73     public FutureTask(Callable<V> callable) {
    74         if (callable == null)
    75             throw new NullPointerException();
    76         sync = new Sync(callable);
    77     }
    78 
    79     /**
    80      * Creates a <tt>FutureTask</tt> that will, upon running, execute the
    81      * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
    82      * given result on successful completion.
    83      *
    84      * @param runnable the runnable task
    85      * @param result the result to return on successful completion. If
    86      * you don't need a particular result, consider using
    87      * constructions of the form:
    88      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
    89      * @throws NullPointerException if runnable is null
    90      */
    91     public FutureTask(Runnable runnable, V result) {
    92         sync = new Sync(Executors.callable(runnable, result));
    93     }
    94 
    95     public boolean isCancelled() {
    96         return sync.innerIsCancelled();
    97     }
    98 
    99     public boolean isDone() {
   100         return sync.innerIsDone();
   101     }
   102 
   103     public boolean cancel(boolean mayInterruptIfRunning) {
   104         return sync.innerCancel(mayInterruptIfRunning);
   105     }
   106 
   107     /**
   108      * @throws CancellationException {@inheritDoc}
   109      */
   110     public V get() throws InterruptedException, ExecutionException {
   111         return sync.innerGet();
   112     }
   113 
   114     /**
   115      * @throws CancellationException {@inheritDoc}
   116      */
   117     public V get(long timeout, TimeUnit unit)
   118         throws InterruptedException, ExecutionException, TimeoutException {
   119         return sync.innerGet(unit.toNanos(timeout));
   120     }
   121 
   122     /**
   123      * Protected method invoked when this task transitions to state
   124      * <tt>isDone</tt> (whether normally or via cancellation). The
   125      * default implementation does nothing.  Subclasses may override
   126      * this method to invoke completion callbacks or perform
   127      * bookkeeping. Note that you can query status inside the
   128      * implementation of this method to determine whether this task
   129      * has been cancelled.
   130      */
   131     protected void done() { }
   132 
   133     /**
   134      * Sets the result of this Future to the given value unless
   135      * this future has already been set or has been cancelled.
   136      * This method is invoked internally by the <tt>run</tt> method
   137      * upon successful completion of the computation.
   138      * @param v the value
   139      */
   140     protected void set(V v) {
   141         sync.innerSet(v);
   142     }
   143 
   144     /**
   145      * Causes this future to report an <tt>ExecutionException</tt>
   146      * with the given throwable as its cause, unless this Future has
   147      * already been set or has been cancelled.
   148      * This method is invoked internally by the <tt>run</tt> method
   149      * upon failure of the computation.
   150      * @param t the cause of failure
   151      */
   152     protected void setException(Throwable t) {
   153         sync.innerSetException(t);
   154     }
   155 
   156     // The following (duplicated) doc comment can be removed once
   157     //
   158     // 6270645: Javadoc comments should be inherited from most derived
   159     //          superinterface or superclass
   160     // is fixed.
   161     /**
   162      * Sets this Future to the result of its computation
   163      * unless it has been cancelled.
   164      */
   165     public void run() {
   166         sync.innerRun();
   167     }
   168 
   169     /**
   170      * Executes the computation without setting its result, and then
   171      * resets this Future to initial state, failing to do so if the
   172      * computation encounters an exception or is cancelled.  This is
   173      * designed for use with tasks that intrinsically execute more
   174      * than once.
   175      * @return true if successfully run and reset
   176      */
   177     protected boolean runAndReset() {
   178         return sync.innerRunAndReset();
   179     }
   180 
   181     /**
   182      * Synchronization control for FutureTask. Note that this must be
   183      * a non-static inner class in order to invoke the protected
   184      * <tt>done</tt> method. For clarity, all inner class support
   185      * methods are same as outer, prefixed with "inner".
   186      *
   187      * Uses AQS sync state to represent run status
   188      */
   189     private final class Sync extends AbstractQueuedSynchronizer {
   190         private static final long serialVersionUID = -7828117401763700385L;
   191 
   192         /** State value representing that task is ready to run */
   193         private static final int READY     = 0;
   194         /** State value representing that task is running */
   195         private static final int RUNNING   = 1;
   196         /** State value representing that task ran */
   197         private static final int RAN       = 2;
   198         /** State value representing that task was cancelled */
   199         private static final int CANCELLED = 4;
   200 
   201         /** The underlying callable */
   202         private final Callable<V> callable;
   203         /** The result to return from get() */
   204         private V result;
   205         /** The exception to throw from get() */
   206         private Throwable exception;
   207 
   208         /**
   209          * The thread running task. When nulled after set/cancel, this
   210          * indicates that the results are accessible.  Must be
   211          * volatile, to ensure visibility upon completion.
   212          */
   213         private volatile Thread runner;
   214 
   215         Sync(Callable<V> callable) {
   216             this.callable = callable;
   217         }
   218 
   219         private boolean ranOrCancelled(int state) {
   220             return (state & (RAN | CANCELLED)) != 0;
   221         }
   222 
   223         /**
   224          * Implements AQS base acquire to succeed if ran or cancelled
   225          */
   226         protected int tryAcquireShared(int ignore) {
   227             return innerIsDone() ? 1 : -1;
   228         }
   229 
   230         /**
   231          * Implements AQS base release to always signal after setting
   232          * final done status by nulling runner thread.
   233          */
   234         protected boolean tryReleaseShared(int ignore) {
   235             runner = null;
   236             return true;
   237         }
   238 
   239         boolean innerIsCancelled() {
   240             return getState() == CANCELLED;
   241         }
   242 
   243         boolean innerIsDone() {
   244             return ranOrCancelled(getState()) && runner == null;
   245         }
   246 
   247         V innerGet() throws InterruptedException, ExecutionException {
   248             acquireSharedInterruptibly(0);
   249             if (getState() == CANCELLED)
   250                 throw new CancellationException();
   251             if (exception != null)
   252                 throw new ExecutionException(exception);
   253             return result;
   254         }
   255 
   256         V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
   257             if (!tryAcquireSharedNanos(0, nanosTimeout))
   258                 throw new TimeoutException();
   259             if (getState() == CANCELLED)
   260                 throw new CancellationException();
   261             if (exception != null)
   262                 throw new ExecutionException(exception);
   263             return result;
   264         }
   265 
   266         void innerSet(V v) {
   267             for (;;) {
   268                 int s = getState();
   269                 if (s == RAN)
   270                     return;
   271                 if (s == CANCELLED) {
   272                     // aggressively release to set runner to null,
   273                     // in case we are racing with a cancel request
   274                     // that will try to interrupt runner
   275                     releaseShared(0);
   276                     return;
   277                 }
   278                 if (compareAndSetState(s, RAN)) {
   279                     result = v;
   280                     releaseShared(0);
   281                     done();
   282                     return;
   283                 }
   284             }
   285         }
   286 
   287         void innerSetException(Throwable t) {
   288             for (;;) {
   289                 int s = getState();
   290                 if (s == RAN)
   291                     return;
   292                 if (s == CANCELLED) {
   293                     // aggressively release to set runner to null,
   294                     // in case we are racing with a cancel request
   295                     // that will try to interrupt runner
   296                     releaseShared(0);
   297                     return;
   298                 }
   299                 if (compareAndSetState(s, RAN)) {
   300                     exception = t;
   301                     releaseShared(0);
   302                     done();
   303                     return;
   304                 }
   305             }
   306         }
   307 
   308         boolean innerCancel(boolean mayInterruptIfRunning) {
   309             for (;;) {
   310                 int s = getState();
   311                 if (ranOrCancelled(s))
   312                     return false;
   313                 if (compareAndSetState(s, CANCELLED))
   314                     break;
   315             }
   316             if (mayInterruptIfRunning) {
   317                 Thread r = runner;
   318                 if (r != null)
   319                     r.interrupt();
   320             }
   321             releaseShared(0);
   322             done();
   323             return true;
   324         }
   325 
   326         void innerRun() {
   327             if (!compareAndSetState(READY, RUNNING))
   328                 return;
   329 
   330             runner = Thread.currentThread();
   331             if (getState() == RUNNING) { // recheck after setting thread
   332                 V result;
   333                 try {
   334                     result = callable.call();
   335                 } catch (Throwable ex) {
   336                     setException(ex);
   337                     return;
   338                 }
   339                 set(result);
   340             } else {
   341                 releaseShared(0); // cancel
   342             }
   343         }
   344 
   345         boolean innerRunAndReset() {
   346             if (!compareAndSetState(READY, RUNNING))
   347                 return false;
   348             try {
   349                 runner = Thread.currentThread();
   350                 if (getState() == RUNNING)
   351                     callable.call(); // don't set result
   352                 runner = null;
   353                 return compareAndSetState(RUNNING, READY);
   354             } catch (Throwable ex) {
   355                 setException(ex);
   356                 return false;
   357             }
   358         }
   359     }
   360 }