rt/emul/compact/src/main/java/java/util/concurrent/ExecutorCompletionService.java
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ExecutorCompletionService.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,205 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +
1.41 +/**
1.42 + * A {@link CompletionService} that uses a supplied {@link Executor}
1.43 + * to execute tasks. This class arranges that submitted tasks are,
1.44 + * upon completion, placed on a queue accessible using {@code take}.
1.45 + * The class is lightweight enough to be suitable for transient use
1.46 + * when processing groups of tasks.
1.47 + *
1.48 + * <p>
1.49 + *
1.50 + * <b>Usage Examples.</b>
1.51 + *
1.52 + * Suppose you have a set of solvers for a certain problem, each
1.53 + * returning a value of some type {@code Result}, and would like to
1.54 + * run them concurrently, processing the results of each of them that
1.55 + * return a non-null value, in some method {@code use(Result r)}. You
1.56 + * could write this as:
1.57 + *
1.58 + * <pre> {@code
1.59 + * void solve(Executor e,
1.60 + * Collection<Callable<Result>> solvers)
1.61 + * throws InterruptedException, ExecutionException {
1.62 + * CompletionService<Result> ecs
1.63 + * = new ExecutorCompletionService<Result>(e);
1.64 + * for (Callable<Result> s : solvers)
1.65 + * ecs.submit(s);
1.66 + * int n = solvers.size();
1.67 + * for (int i = 0; i < n; ++i) {
1.68 + * Result r = ecs.take().get();
1.69 + * if (r != null)
1.70 + * use(r);
1.71 + * }
1.72 + * }}</pre>
1.73 + *
1.74 + * Suppose instead that you would like to use the first non-null result
1.75 + * of the set of tasks, ignoring any that encounter exceptions,
1.76 + * and cancelling all other tasks when the first one is ready:
1.77 + *
1.78 + * <pre> {@code
1.79 + * void solve(Executor e,
1.80 + * Collection<Callable<Result>> solvers)
1.81 + * throws InterruptedException {
1.82 + * CompletionService<Result> ecs
1.83 + * = new ExecutorCompletionService<Result>(e);
1.84 + * int n = solvers.size();
1.85 + * List<Future<Result>> futures
1.86 + * = new ArrayList<Future<Result>>(n);
1.87 + * Result result = null;
1.88 + * try {
1.89 + * for (Callable<Result> s : solvers)
1.90 + * futures.add(ecs.submit(s));
1.91 + * for (int i = 0; i < n; ++i) {
1.92 + * try {
1.93 + * Result r = ecs.take().get();
1.94 + * if (r != null) {
1.95 + * result = r;
1.96 + * break;
1.97 + * }
1.98 + * } catch (ExecutionException ignore) {}
1.99 + * }
1.100 + * }
1.101 + * finally {
1.102 + * for (Future<Result> f : futures)
1.103 + * f.cancel(true);
1.104 + * }
1.105 + *
1.106 + * if (result != null)
1.107 + * use(result);
1.108 + * }}</pre>
1.109 + */
1.110 +public class ExecutorCompletionService<V> implements CompletionService<V> {
1.111 + private final Executor executor;
1.112 + private final AbstractExecutorService aes;
1.113 + private final BlockingQueue<Future<V>> completionQueue;
1.114 +
1.115 + /**
1.116 + * FutureTask extension to enqueue upon completion
1.117 + */
1.118 + private class QueueingFuture extends FutureTask<Void> {
1.119 + QueueingFuture(RunnableFuture<V> task) {
1.120 + super(task, null);
1.121 + this.task = task;
1.122 + }
1.123 + protected void done() { completionQueue.add(task); }
1.124 + private final Future<V> task;
1.125 + }
1.126 +
1.127 + private RunnableFuture<V> newTaskFor(Callable<V> task) {
1.128 + if (aes == null)
1.129 + return new FutureTask<V>(task);
1.130 + else
1.131 + return aes.newTaskFor(task);
1.132 + }
1.133 +
1.134 + private RunnableFuture<V> newTaskFor(Runnable task, V result) {
1.135 + if (aes == null)
1.136 + return new FutureTask<V>(task, result);
1.137 + else
1.138 + return aes.newTaskFor(task, result);
1.139 + }
1.140 +
1.141 + /**
1.142 + * Creates an ExecutorCompletionService using the supplied
1.143 + * executor for base task execution and a
1.144 + * {@link LinkedBlockingQueue} as a completion queue.
1.145 + *
1.146 + * @param executor the executor to use
1.147 + * @throws NullPointerException if executor is {@code null}
1.148 + */
1.149 + public ExecutorCompletionService(Executor executor) {
1.150 + if (executor == null)
1.151 + throw new NullPointerException();
1.152 + this.executor = executor;
1.153 + this.aes = (executor instanceof AbstractExecutorService) ?
1.154 + (AbstractExecutorService) executor : null;
1.155 + this.completionQueue = new LinkedBlockingQueue<Future<V>>();
1.156 + }
1.157 +
1.158 + /**
1.159 + * Creates an ExecutorCompletionService using the supplied
1.160 + * executor for base task execution and the supplied queue as its
1.161 + * completion queue.
1.162 + *
1.163 + * @param executor the executor to use
1.164 + * @param completionQueue the queue to use as the completion queue
1.165 + * normally one dedicated for use by this service. This
1.166 + * queue is treated as unbounded -- failed attempted
1.167 + * {@code Queue.add} operations for completed taskes cause
1.168 + * them not to be retrievable.
1.169 + * @throws NullPointerException if executor or completionQueue are {@code null}
1.170 + */
1.171 + public ExecutorCompletionService(Executor executor,
1.172 + BlockingQueue<Future<V>> completionQueue) {
1.173 + if (executor == null || completionQueue == null)
1.174 + throw new NullPointerException();
1.175 + this.executor = executor;
1.176 + this.aes = (executor instanceof AbstractExecutorService) ?
1.177 + (AbstractExecutorService) executor : null;
1.178 + this.completionQueue = completionQueue;
1.179 + }
1.180 +
1.181 + public Future<V> submit(Callable<V> task) {
1.182 + if (task == null) throw new NullPointerException();
1.183 + RunnableFuture<V> f = newTaskFor(task);
1.184 + executor.execute(new QueueingFuture(f));
1.185 + return f;
1.186 + }
1.187 +
1.188 + public Future<V> submit(Runnable task, V result) {
1.189 + if (task == null) throw new NullPointerException();
1.190 + RunnableFuture<V> f = newTaskFor(task, result);
1.191 + executor.execute(new QueueingFuture(f));
1.192 + return f;
1.193 + }
1.194 +
1.195 + public Future<V> take() throws InterruptedException {
1.196 + return completionQueue.take();
1.197 + }
1.198 +
1.199 + public Future<V> poll() {
1.200 + return completionQueue.poll();
1.201 + }
1.202 +
1.203 + public Future<V> poll(long timeout, TimeUnit unit)
1.204 + throws InterruptedException {
1.205 + return completionQueue.poll(timeout, unit);
1.206 + }
1.207 +
1.208 +}