rt/emul/compact/src/main/java/java/util/concurrent/AbstractExecutorService.java
branchjdk7-b147
changeset 1890 212417b74b72
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/AbstractExecutorService.java	Sat Mar 19 10:46:31 2016 +0100
     1.3 @@ -0,0 +1,309 @@
     1.4 +/*
     1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     1.6 + *
     1.7 + * This code is free software; you can redistribute it and/or modify it
     1.8 + * under the terms of the GNU General Public License version 2 only, as
     1.9 + * published by the Free Software Foundation.  Oracle designates this
    1.10 + * particular file as subject to the "Classpath" exception as provided
    1.11 + * by Oracle in the LICENSE file that accompanied this code.
    1.12 + *
    1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
    1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    1.15 + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    1.16 + * version 2 for more details (a copy is included in the LICENSE file that
    1.17 + * accompanied this code).
    1.18 + *
    1.19 + * You should have received a copy of the GNU General Public License version
    1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
    1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    1.22 + *
    1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    1.24 + * or visit www.oracle.com if you need additional information or have any
    1.25 + * questions.
    1.26 + */
    1.27 +
    1.28 +/*
    1.29 + * This file is available under and governed by the GNU General Public
    1.30 + * License version 2 only, as published by the Free Software Foundation.
    1.31 + * However, the following notice accompanied the original version of this
    1.32 + * file:
    1.33 + *
    1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
    1.35 + * Expert Group and released to the public domain, as explained at
    1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
    1.37 + */
    1.38 +
    1.39 +package java.util.concurrent;
    1.40 +import java.util.*;
    1.41 +
    1.42 +/**
    1.43 + * Provides default implementations of {@link ExecutorService}
    1.44 + * execution methods. This class implements the <tt>submit</tt>,
    1.45 + * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
    1.46 + * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
    1.47 + * to the {@link FutureTask} class provided in this package.  For example,
    1.48 + * the implementation of <tt>submit(Runnable)</tt> creates an
    1.49 + * associated <tt>RunnableFuture</tt> that is executed and
    1.50 + * returned. Subclasses may override the <tt>newTaskFor</tt> methods
    1.51 + * to return <tt>RunnableFuture</tt> implementations other than
    1.52 + * <tt>FutureTask</tt>.
    1.53 + *
    1.54 + * <p> <b>Extension example</b>. Here is a sketch of a class
    1.55 + * that customizes {@link ThreadPoolExecutor} to use
    1.56 + * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
    1.57 + *  <pre> {@code
    1.58 + * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    1.59 + *
    1.60 + *   static class CustomTask<V> implements RunnableFuture<V> {...}
    1.61 + *
    1.62 + *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    1.63 + *       return new CustomTask<V>(c);
    1.64 + *   }
    1.65 + *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
    1.66 + *       return new CustomTask<V>(r, v);
    1.67 + *   }
    1.68 + *   // ... add constructors, etc.
    1.69 + * }}</pre>
    1.70 + *
    1.71 + * @since 1.5
    1.72 + * @author Doug Lea
    1.73 + */
    1.74 +public abstract class AbstractExecutorService implements ExecutorService {
    1.75 +
    1.76 +    /**
    1.77 +     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
    1.78 +     * value.
    1.79 +     *
    1.80 +     * @param runnable the runnable task being wrapped
    1.81 +     * @param value the default value for the returned future
    1.82 +     * @return a <tt>RunnableFuture</tt> which when run will run the
    1.83 +     * underlying runnable and which, as a <tt>Future</tt>, will yield
    1.84 +     * the given value as its result and provide for cancellation of
    1.85 +     * the underlying task.
    1.86 +     * @since 1.6
    1.87 +     */
    1.88 +    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    1.89 +        return new FutureTask<T>(runnable, value);
    1.90 +    }
    1.91 +
    1.92 +    /**
    1.93 +     * Returns a <tt>RunnableFuture</tt> for the given callable task.
    1.94 +     *
    1.95 +     * @param callable the callable task being wrapped
    1.96 +     * @return a <tt>RunnableFuture</tt> which when run will call the
    1.97 +     * underlying callable and which, as a <tt>Future</tt>, will yield
    1.98 +     * the callable's result as its result and provide for
    1.99 +     * cancellation of the underlying task.
   1.100 +     * @since 1.6
   1.101 +     */
   1.102 +    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   1.103 +        return new FutureTask<T>(callable);
   1.104 +    }
   1.105 +
   1.106 +    /**
   1.107 +     * @throws RejectedExecutionException {@inheritDoc}
   1.108 +     * @throws NullPointerException       {@inheritDoc}
   1.109 +     */
   1.110 +    public Future<?> submit(Runnable task) {
   1.111 +        if (task == null) throw new NullPointerException();
   1.112 +        RunnableFuture<Void> ftask = newTaskFor(task, null);
   1.113 +        execute(ftask);
   1.114 +        return ftask;
   1.115 +    }
   1.116 +
   1.117 +    /**
   1.118 +     * @throws RejectedExecutionException {@inheritDoc}
   1.119 +     * @throws NullPointerException       {@inheritDoc}
   1.120 +     */
   1.121 +    public <T> Future<T> submit(Runnable task, T result) {
   1.122 +        if (task == null) throw new NullPointerException();
   1.123 +        RunnableFuture<T> ftask = newTaskFor(task, result);
   1.124 +        execute(ftask);
   1.125 +        return ftask;
   1.126 +    }
   1.127 +
   1.128 +    /**
   1.129 +     * @throws RejectedExecutionException {@inheritDoc}
   1.130 +     * @throws NullPointerException       {@inheritDoc}
   1.131 +     */
   1.132 +    public <T> Future<T> submit(Callable<T> task) {
   1.133 +        if (task == null) throw new NullPointerException();
   1.134 +        RunnableFuture<T> ftask = newTaskFor(task);
   1.135 +        execute(ftask);
   1.136 +        return ftask;
   1.137 +    }
   1.138 +
   1.139 +    /**
   1.140 +     * the main mechanics of invokeAny.
   1.141 +     */
   1.142 +    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
   1.143 +                            boolean timed, long nanos)
   1.144 +        throws InterruptedException, ExecutionException, TimeoutException {
   1.145 +        if (tasks == null)
   1.146 +            throw new NullPointerException();
   1.147 +        int ntasks = tasks.size();
   1.148 +        if (ntasks == 0)
   1.149 +            throw new IllegalArgumentException();
   1.150 +        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
   1.151 +        ExecutorCompletionService<T> ecs =
   1.152 +            new ExecutorCompletionService<T>(this);
   1.153 +
   1.154 +        // For efficiency, especially in executors with limited
   1.155 +        // parallelism, check to see if previously submitted tasks are
   1.156 +        // done before submitting more of them. This interleaving
   1.157 +        // plus the exception mechanics account for messiness of main
   1.158 +        // loop.
   1.159 +
   1.160 +        try {
   1.161 +            // Record exceptions so that if we fail to obtain any
   1.162 +            // result, we can throw the last exception we got.
   1.163 +            ExecutionException ee = null;
   1.164 +            long lastTime = timed ? System.nanoTime() : 0;
   1.165 +            Iterator<? extends Callable<T>> it = tasks.iterator();
   1.166 +
   1.167 +            // Start one task for sure; the rest incrementally
   1.168 +            futures.add(ecs.submit(it.next()));
   1.169 +            --ntasks;
   1.170 +            int active = 1;
   1.171 +
   1.172 +            for (;;) {
   1.173 +                Future<T> f = ecs.poll();
   1.174 +                if (f == null) {
   1.175 +                    if (ntasks > 0) {
   1.176 +                        --ntasks;
   1.177 +                        futures.add(ecs.submit(it.next()));
   1.178 +                        ++active;
   1.179 +                    }
   1.180 +                    else if (active == 0)
   1.181 +                        break;
   1.182 +                    else if (timed) {
   1.183 +                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
   1.184 +                        if (f == null)
   1.185 +                            throw new TimeoutException();
   1.186 +                        long now = System.nanoTime();
   1.187 +                        nanos -= now - lastTime;
   1.188 +                        lastTime = now;
   1.189 +                    }
   1.190 +                    else
   1.191 +                        f = ecs.take();
   1.192 +                }
   1.193 +                if (f != null) {
   1.194 +                    --active;
   1.195 +                    try {
   1.196 +                        return f.get();
   1.197 +                    } catch (ExecutionException eex) {
   1.198 +                        ee = eex;
   1.199 +                    } catch (RuntimeException rex) {
   1.200 +                        ee = new ExecutionException(rex);
   1.201 +                    }
   1.202 +                }
   1.203 +            }
   1.204 +
   1.205 +            if (ee == null)
   1.206 +                ee = new ExecutionException();
   1.207 +            throw ee;
   1.208 +
   1.209 +        } finally {
   1.210 +            for (Future<T> f : futures)
   1.211 +                f.cancel(true);
   1.212 +        }
   1.213 +    }
   1.214 +
   1.215 +    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
   1.216 +        throws InterruptedException, ExecutionException {
   1.217 +        try {
   1.218 +            return doInvokeAny(tasks, false, 0);
   1.219 +        } catch (TimeoutException cannotHappen) {
   1.220 +            assert false;
   1.221 +            return null;
   1.222 +        }
   1.223 +    }
   1.224 +
   1.225 +    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
   1.226 +                           long timeout, TimeUnit unit)
   1.227 +        throws InterruptedException, ExecutionException, TimeoutException {
   1.228 +        return doInvokeAny(tasks, true, unit.toNanos(timeout));
   1.229 +    }
   1.230 +
   1.231 +    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
   1.232 +        throws InterruptedException {
   1.233 +        if (tasks == null)
   1.234 +            throw new NullPointerException();
   1.235 +        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
   1.236 +        boolean done = false;
   1.237 +        try {
   1.238 +            for (Callable<T> t : tasks) {
   1.239 +                RunnableFuture<T> f = newTaskFor(t);
   1.240 +                futures.add(f);
   1.241 +                execute(f);
   1.242 +            }
   1.243 +            for (Future<T> f : futures) {
   1.244 +                if (!f.isDone()) {
   1.245 +                    try {
   1.246 +                        f.get();
   1.247 +                    } catch (CancellationException ignore) {
   1.248 +                    } catch (ExecutionException ignore) {
   1.249 +                    }
   1.250 +                }
   1.251 +            }
   1.252 +            done = true;
   1.253 +            return futures;
   1.254 +        } finally {
   1.255 +            if (!done)
   1.256 +                for (Future<T> f : futures)
   1.257 +                    f.cancel(true);
   1.258 +        }
   1.259 +    }
   1.260 +
   1.261 +    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
   1.262 +                                         long timeout, TimeUnit unit)
   1.263 +        throws InterruptedException {
   1.264 +        if (tasks == null || unit == null)
   1.265 +            throw new NullPointerException();
   1.266 +        long nanos = unit.toNanos(timeout);
   1.267 +        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
   1.268 +        boolean done = false;
   1.269 +        try {
   1.270 +            for (Callable<T> t : tasks)
   1.271 +                futures.add(newTaskFor(t));
   1.272 +
   1.273 +            long lastTime = System.nanoTime();
   1.274 +
   1.275 +            // Interleave time checks and calls to execute in case
   1.276 +            // executor doesn't have any/much parallelism.
   1.277 +            Iterator<Future<T>> it = futures.iterator();
   1.278 +            while (it.hasNext()) {
   1.279 +                execute((Runnable)(it.next()));
   1.280 +                long now = System.nanoTime();
   1.281 +                nanos -= now - lastTime;
   1.282 +                lastTime = now;
   1.283 +                if (nanos <= 0)
   1.284 +                    return futures;
   1.285 +            }
   1.286 +
   1.287 +            for (Future<T> f : futures) {
   1.288 +                if (!f.isDone()) {
   1.289 +                    if (nanos <= 0)
   1.290 +                        return futures;
   1.291 +                    try {
   1.292 +                        f.get(nanos, TimeUnit.NANOSECONDS);
   1.293 +                    } catch (CancellationException ignore) {
   1.294 +                    } catch (ExecutionException ignore) {
   1.295 +                    } catch (TimeoutException toe) {
   1.296 +                        return futures;
   1.297 +                    }
   1.298 +                    long now = System.nanoTime();
   1.299 +                    nanos -= now - lastTime;
   1.300 +                    lastTime = now;
   1.301 +                }
   1.302 +            }
   1.303 +            done = true;
   1.304 +            return futures;
   1.305 +        } finally {
   1.306 +            if (!done)
   1.307 +                for (Future<T> f : futures)
   1.308 +                    f.cancel(true);
   1.309 +        }
   1.310 +    }
   1.311 +
   1.312 +}