rt/emul/compact/src/main/java/java/util/concurrent/AbstractExecutorService.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 import java.util.*;
    38 
    39 /**
    40  * Provides default implementations of {@link ExecutorService}
    41  * execution methods. This class implements the <tt>submit</tt>,
    42  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
    43  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
    44  * to the {@link FutureTask} class provided in this package.  For example,
    45  * the implementation of <tt>submit(Runnable)</tt> creates an
    46  * associated <tt>RunnableFuture</tt> that is executed and
    47  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
    48  * to return <tt>RunnableFuture</tt> implementations other than
    49  * <tt>FutureTask</tt>.
    50  *
    51  * <p> <b>Extension example</b>. Here is a sketch of a class
    52  * that customizes {@link ThreadPoolExecutor} to use
    53  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
    54  *  <pre> {@code
    55  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    56  *
    57  *   static class CustomTask<V> implements RunnableFuture<V> {...}
    58  *
    59  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    60  *       return new CustomTask<V>(c);
    61  *   }
    62  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
    63  *       return new CustomTask<V>(r, v);
    64  *   }
    65  *   // ... add constructors, etc.
    66  * }}</pre>
    67  *
    68  * @since 1.5
    69  * @author Doug Lea
    70  */
    71 public abstract class AbstractExecutorService implements ExecutorService {
    72 
    73     /**
    74      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
    75      * value.
    76      *
    77      * @param runnable the runnable task being wrapped
    78      * @param value the default value for the returned future
    79      * @return a <tt>RunnableFuture</tt> which when run will run the
    80      * underlying runnable and which, as a <tt>Future</tt>, will yield
    81      * the given value as its result and provide for cancellation of
    82      * the underlying task.
    83      * @since 1.6
    84      */
    85     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    86         return new FutureTask<T>(runnable, value);
    87     }
    88 
    89     /**
    90      * Returns a <tt>RunnableFuture</tt> for the given callable task.
    91      *
    92      * @param callable the callable task being wrapped
    93      * @return a <tt>RunnableFuture</tt> which when run will call the
    94      * underlying callable and which, as a <tt>Future</tt>, will yield
    95      * the callable's result as its result and provide for
    96      * cancellation of the underlying task.
    97      * @since 1.6
    98      */
    99     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   100         return new FutureTask<T>(callable);
   101     }
   102 
   103     /**
   104      * @throws RejectedExecutionException {@inheritDoc}
   105      * @throws NullPointerException       {@inheritDoc}
   106      */
   107     public Future<?> submit(Runnable task) {
   108         if (task == null) throw new NullPointerException();
   109         RunnableFuture<Void> ftask = newTaskFor(task, null);
   110         execute(ftask);
   111         return ftask;
   112     }
   113 
   114     /**
   115      * @throws RejectedExecutionException {@inheritDoc}
   116      * @throws NullPointerException       {@inheritDoc}
   117      */
   118     public <T> Future<T> submit(Runnable task, T result) {
   119         if (task == null) throw new NullPointerException();
   120         RunnableFuture<T> ftask = newTaskFor(task, result);
   121         execute(ftask);
   122         return ftask;
   123     }
   124 
   125     /**
   126      * @throws RejectedExecutionException {@inheritDoc}
   127      * @throws NullPointerException       {@inheritDoc}
   128      */
   129     public <T> Future<T> submit(Callable<T> task) {
   130         if (task == null) throw new NullPointerException();
   131         RunnableFuture<T> ftask = newTaskFor(task);
   132         execute(ftask);
   133         return ftask;
   134     }
   135 
   136     /**
   137      * the main mechanics of invokeAny.
   138      */
   139     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
   140                             boolean timed, long nanos)
   141         throws InterruptedException, ExecutionException, TimeoutException {
   142         if (tasks == null)
   143             throw new NullPointerException();
   144         int ntasks = tasks.size();
   145         if (ntasks == 0)
   146             throw new IllegalArgumentException();
   147         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
   148         ExecutorCompletionService<T> ecs =
   149             new ExecutorCompletionService<T>(this);
   150 
   151         // For efficiency, especially in executors with limited
   152         // parallelism, check to see if previously submitted tasks are
   153         // done before submitting more of them. This interleaving
   154         // plus the exception mechanics account for messiness of main
   155         // loop.
   156 
   157         try {
   158             // Record exceptions so that if we fail to obtain any
   159             // result, we can throw the last exception we got.
   160             ExecutionException ee = null;
   161             long lastTime = timed ? System.nanoTime() : 0;
   162             Iterator<? extends Callable<T>> it = tasks.iterator();
   163 
   164             // Start one task for sure; the rest incrementally
   165             futures.add(ecs.submit(it.next()));
   166             --ntasks;
   167             int active = 1;
   168 
   169             for (;;) {
   170                 Future<T> f = ecs.poll();
   171                 if (f == null) {
   172                     if (ntasks > 0) {
   173                         --ntasks;
   174                         futures.add(ecs.submit(it.next()));
   175                         ++active;
   176                     }
   177                     else if (active == 0)
   178                         break;
   179                     else if (timed) {
   180                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
   181                         if (f == null)
   182                             throw new TimeoutException();
   183                         long now = System.nanoTime();
   184                         nanos -= now - lastTime;
   185                         lastTime = now;
   186                     }
   187                     else
   188                         f = ecs.take();
   189                 }
   190                 if (f != null) {
   191                     --active;
   192                     try {
   193                         return f.get();
   194                     } catch (ExecutionException eex) {
   195                         ee = eex;
   196                     } catch (RuntimeException rex) {
   197                         ee = new ExecutionException(rex);
   198                     }
   199                 }
   200             }
   201 
   202             if (ee == null)
   203                 ee = new ExecutionException();
   204             throw ee;
   205 
   206         } finally {
   207             for (Future<T> f : futures)
   208                 f.cancel(true);
   209         }
   210     }
   211 
   212     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
   213         throws InterruptedException, ExecutionException {
   214         try {
   215             return doInvokeAny(tasks, false, 0);
   216         } catch (TimeoutException cannotHappen) {
   217             assert false;
   218             return null;
   219         }
   220     }
   221 
   222     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
   223                            long timeout, TimeUnit unit)
   224         throws InterruptedException, ExecutionException, TimeoutException {
   225         return doInvokeAny(tasks, true, unit.toNanos(timeout));
   226     }
   227 
   228     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
   229         throws InterruptedException {
   230         if (tasks == null)
   231             throw new NullPointerException();
   232         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
   233         boolean done = false;
   234         try {
   235             for (Callable<T> t : tasks) {
   236                 RunnableFuture<T> f = newTaskFor(t);
   237                 futures.add(f);
   238                 execute(f);
   239             }
   240             for (Future<T> f : futures) {
   241                 if (!f.isDone()) {
   242                     try {
   243                         f.get();
   244                     } catch (CancellationException ignore) {
   245                     } catch (ExecutionException ignore) {
   246                     }
   247                 }
   248             }
   249             done = true;
   250             return futures;
   251         } finally {
   252             if (!done)
   253                 for (Future<T> f : futures)
   254                     f.cancel(true);
   255         }
   256     }
   257 
   258     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
   259                                          long timeout, TimeUnit unit)
   260         throws InterruptedException {
   261         if (tasks == null || unit == null)
   262             throw new NullPointerException();
   263         long nanos = unit.toNanos(timeout);
   264         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
   265         boolean done = false;
   266         try {
   267             for (Callable<T> t : tasks)
   268                 futures.add(newTaskFor(t));
   269 
   270             long lastTime = System.nanoTime();
   271 
   272             // Interleave time checks and calls to execute in case
   273             // executor doesn't have any/much parallelism.
   274             Iterator<Future<T>> it = futures.iterator();
   275             while (it.hasNext()) {
   276                 execute((Runnable)(it.next()));
   277                 long now = System.nanoTime();
   278                 nanos -= now - lastTime;
   279                 lastTime = now;
   280                 if (nanos <= 0)
   281                     return futures;
   282             }
   283 
   284             for (Future<T> f : futures) {
   285                 if (!f.isDone()) {
   286                     if (nanos <= 0)
   287                         return futures;
   288                     try {
   289                         f.get(nanos, TimeUnit.NANOSECONDS);
   290                     } catch (CancellationException ignore) {
   291                     } catch (ExecutionException ignore) {
   292                     } catch (TimeoutException toe) {
   293                         return futures;
   294                     }
   295                     long now = System.nanoTime();
   296                     nanos -= now - lastTime;
   297                     lastTime = now;
   298                 }
   299             }
   300             done = true;
   301             return futures;
   302         } finally {
   303             if (!done)
   304                 for (Future<T> f : futures)
   305                     f.cancel(true);
   306         }
   307     }
   308 
   309 }