1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/AbstractExecutorService.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,309 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +import java.util.*;
1.41 +
1.42 +/**
1.43 + * Provides default implementations of {@link ExecutorService}
1.44 + * execution methods. This class implements the <tt>submit</tt>,
1.45 + * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
1.46 + * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
1.47 + * to the {@link FutureTask} class provided in this package. For example,
1.48 + * the implementation of <tt>submit(Runnable)</tt> creates an
1.49 + * associated <tt>RunnableFuture</tt> that is executed and
1.50 + * returned. Subclasses may override the <tt>newTaskFor</tt> methods
1.51 + * to return <tt>RunnableFuture</tt> implementations other than
1.52 + * <tt>FutureTask</tt>.
1.53 + *
1.54 + * <p> <b>Extension example</b>. Here is a sketch of a class
1.55 + * that customizes {@link ThreadPoolExecutor} to use
1.56 + * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
1.57 + * <pre> {@code
1.58 + * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
1.59 + *
1.60 + * static class CustomTask<V> implements RunnableFuture<V> {...}
1.61 + *
1.62 + * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
1.63 + * return new CustomTask<V>(c);
1.64 + * }
1.65 + * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
1.66 + * return new CustomTask<V>(r, v);
1.67 + * }
1.68 + * // ... add constructors, etc.
1.69 + * }}</pre>
1.70 + *
1.71 + * @since 1.5
1.72 + * @author Doug Lea
1.73 + */
1.74 +public abstract class AbstractExecutorService implements ExecutorService {
1.75 +
1.76 + /**
1.77 + * Returns a <tt>RunnableFuture</tt> for the given runnable and default
1.78 + * value.
1.79 + *
1.80 + * @param runnable the runnable task being wrapped
1.81 + * @param value the default value for the returned future
1.82 + * @return a <tt>RunnableFuture</tt> which when run will run the
1.83 + * underlying runnable and which, as a <tt>Future</tt>, will yield
1.84 + * the given value as its result and provide for cancellation of
1.85 + * the underlying task.
1.86 + * @since 1.6
1.87 + */
1.88 + protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1.89 + return new FutureTask<T>(runnable, value);
1.90 + }
1.91 +
1.92 + /**
1.93 + * Returns a <tt>RunnableFuture</tt> for the given callable task.
1.94 + *
1.95 + * @param callable the callable task being wrapped
1.96 + * @return a <tt>RunnableFuture</tt> which when run will call the
1.97 + * underlying callable and which, as a <tt>Future</tt>, will yield
1.98 + * the callable's result as its result and provide for
1.99 + * cancellation of the underlying task.
1.100 + * @since 1.6
1.101 + */
1.102 + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1.103 + return new FutureTask<T>(callable);
1.104 + }
1.105 +
1.106 + /**
1.107 + * @throws RejectedExecutionException {@inheritDoc}
1.108 + * @throws NullPointerException {@inheritDoc}
1.109 + */
1.110 + public Future<?> submit(Runnable task) {
1.111 + if (task == null) throw new NullPointerException();
1.112 + RunnableFuture<Void> ftask = newTaskFor(task, null);
1.113 + execute(ftask);
1.114 + return ftask;
1.115 + }
1.116 +
1.117 + /**
1.118 + * @throws RejectedExecutionException {@inheritDoc}
1.119 + * @throws NullPointerException {@inheritDoc}
1.120 + */
1.121 + public <T> Future<T> submit(Runnable task, T result) {
1.122 + if (task == null) throw new NullPointerException();
1.123 + RunnableFuture<T> ftask = newTaskFor(task, result);
1.124 + execute(ftask);
1.125 + return ftask;
1.126 + }
1.127 +
1.128 + /**
1.129 + * @throws RejectedExecutionException {@inheritDoc}
1.130 + * @throws NullPointerException {@inheritDoc}
1.131 + */
1.132 + public <T> Future<T> submit(Callable<T> task) {
1.133 + if (task == null) throw new NullPointerException();
1.134 + RunnableFuture<T> ftask = newTaskFor(task);
1.135 + execute(ftask);
1.136 + return ftask;
1.137 + }
1.138 +
1.139 + /**
1.140 + * the main mechanics of invokeAny.
1.141 + */
1.142 + private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
1.143 + boolean timed, long nanos)
1.144 + throws InterruptedException, ExecutionException, TimeoutException {
1.145 + if (tasks == null)
1.146 + throw new NullPointerException();
1.147 + int ntasks = tasks.size();
1.148 + if (ntasks == 0)
1.149 + throw new IllegalArgumentException();
1.150 + List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
1.151 + ExecutorCompletionService<T> ecs =
1.152 + new ExecutorCompletionService<T>(this);
1.153 +
1.154 + // For efficiency, especially in executors with limited
1.155 + // parallelism, check to see if previously submitted tasks are
1.156 + // done before submitting more of them. This interleaving
1.157 + // plus the exception mechanics account for messiness of main
1.158 + // loop.
1.159 +
1.160 + try {
1.161 + // Record exceptions so that if we fail to obtain any
1.162 + // result, we can throw the last exception we got.
1.163 + ExecutionException ee = null;
1.164 + long lastTime = timed ? System.nanoTime() : 0;
1.165 + Iterator<? extends Callable<T>> it = tasks.iterator();
1.166 +
1.167 + // Start one task for sure; the rest incrementally
1.168 + futures.add(ecs.submit(it.next()));
1.169 + --ntasks;
1.170 + int active = 1;
1.171 +
1.172 + for (;;) {
1.173 + Future<T> f = ecs.poll();
1.174 + if (f == null) {
1.175 + if (ntasks > 0) {
1.176 + --ntasks;
1.177 + futures.add(ecs.submit(it.next()));
1.178 + ++active;
1.179 + }
1.180 + else if (active == 0)
1.181 + break;
1.182 + else if (timed) {
1.183 + f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
1.184 + if (f == null)
1.185 + throw new TimeoutException();
1.186 + long now = System.nanoTime();
1.187 + nanos -= now - lastTime;
1.188 + lastTime = now;
1.189 + }
1.190 + else
1.191 + f = ecs.take();
1.192 + }
1.193 + if (f != null) {
1.194 + --active;
1.195 + try {
1.196 + return f.get();
1.197 + } catch (ExecutionException eex) {
1.198 + ee = eex;
1.199 + } catch (RuntimeException rex) {
1.200 + ee = new ExecutionException(rex);
1.201 + }
1.202 + }
1.203 + }
1.204 +
1.205 + if (ee == null)
1.206 + ee = new ExecutionException();
1.207 + throw ee;
1.208 +
1.209 + } finally {
1.210 + for (Future<T> f : futures)
1.211 + f.cancel(true);
1.212 + }
1.213 + }
1.214 +
1.215 + public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
1.216 + throws InterruptedException, ExecutionException {
1.217 + try {
1.218 + return doInvokeAny(tasks, false, 0);
1.219 + } catch (TimeoutException cannotHappen) {
1.220 + assert false;
1.221 + return null;
1.222 + }
1.223 + }
1.224 +
1.225 + public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
1.226 + long timeout, TimeUnit unit)
1.227 + throws InterruptedException, ExecutionException, TimeoutException {
1.228 + return doInvokeAny(tasks, true, unit.toNanos(timeout));
1.229 + }
1.230 +
1.231 + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
1.232 + throws InterruptedException {
1.233 + if (tasks == null)
1.234 + throw new NullPointerException();
1.235 + List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
1.236 + boolean done = false;
1.237 + try {
1.238 + for (Callable<T> t : tasks) {
1.239 + RunnableFuture<T> f = newTaskFor(t);
1.240 + futures.add(f);
1.241 + execute(f);
1.242 + }
1.243 + for (Future<T> f : futures) {
1.244 + if (!f.isDone()) {
1.245 + try {
1.246 + f.get();
1.247 + } catch (CancellationException ignore) {
1.248 + } catch (ExecutionException ignore) {
1.249 + }
1.250 + }
1.251 + }
1.252 + done = true;
1.253 + return futures;
1.254 + } finally {
1.255 + if (!done)
1.256 + for (Future<T> f : futures)
1.257 + f.cancel(true);
1.258 + }
1.259 + }
1.260 +
1.261 + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
1.262 + long timeout, TimeUnit unit)
1.263 + throws InterruptedException {
1.264 + if (tasks == null || unit == null)
1.265 + throw new NullPointerException();
1.266 + long nanos = unit.toNanos(timeout);
1.267 + List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
1.268 + boolean done = false;
1.269 + try {
1.270 + for (Callable<T> t : tasks)
1.271 + futures.add(newTaskFor(t));
1.272 +
1.273 + long lastTime = System.nanoTime();
1.274 +
1.275 + // Interleave time checks and calls to execute in case
1.276 + // executor doesn't have any/much parallelism.
1.277 + Iterator<Future<T>> it = futures.iterator();
1.278 + while (it.hasNext()) {
1.279 + execute((Runnable)(it.next()));
1.280 + long now = System.nanoTime();
1.281 + nanos -= now - lastTime;
1.282 + lastTime = now;
1.283 + if (nanos <= 0)
1.284 + return futures;
1.285 + }
1.286 +
1.287 + for (Future<T> f : futures) {
1.288 + if (!f.isDone()) {
1.289 + if (nanos <= 0)
1.290 + return futures;
1.291 + try {
1.292 + f.get(nanos, TimeUnit.NANOSECONDS);
1.293 + } catch (CancellationException ignore) {
1.294 + } catch (ExecutionException ignore) {
1.295 + } catch (TimeoutException toe) {
1.296 + return futures;
1.297 + }
1.298 + long now = System.nanoTime();
1.299 + nanos -= now - lastTime;
1.300 + lastTime = now;
1.301 + }
1.302 + }
1.303 + done = true;
1.304 + return futures;
1.305 + } finally {
1.306 + if (!done)
1.307 + for (Future<T> f : futures)
1.308 + f.cancel(true);
1.309 + }
1.310 + }
1.311 +
1.312 +}