jaroslav@1890: /* jaroslav@1890: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. jaroslav@1890: * jaroslav@1890: * This code is free software; you can redistribute it and/or modify it jaroslav@1890: * under the terms of the GNU General Public License version 2 only, as jaroslav@1890: * published by the Free Software Foundation. Oracle designates this jaroslav@1890: * particular file as subject to the "Classpath" exception as provided jaroslav@1890: * by Oracle in the LICENSE file that accompanied this code. jaroslav@1890: * jaroslav@1890: * This code is distributed in the hope that it will be useful, but WITHOUT jaroslav@1890: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or jaroslav@1890: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License jaroslav@1890: * version 2 for more details (a copy is included in the LICENSE file that jaroslav@1890: * accompanied this code). jaroslav@1890: * jaroslav@1890: * You should have received a copy of the GNU General Public License version jaroslav@1890: * 2 along with this work; if not, write to the Free Software Foundation, jaroslav@1890: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. jaroslav@1890: * jaroslav@1890: * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA jaroslav@1890: * or visit www.oracle.com if you need additional information or have any jaroslav@1890: * questions. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * This file is available under and governed by the GNU General Public jaroslav@1890: * License version 2 only, as published by the Free Software Foundation. jaroslav@1890: * However, the following notice accompanied the original version of this jaroslav@1890: * file: jaroslav@1890: * jaroslav@1890: * Written by Doug Lea with assistance from members of JCP JSR-166 jaroslav@1890: * Expert Group and released to the public domain, as explained at jaroslav@1890: * http://creativecommons.org/publicdomain/zero/1.0/ jaroslav@1890: */ jaroslav@1890: jaroslav@1890: package java.util.concurrent; jaroslav@1890: import java.util.concurrent.locks.*; jaroslav@1890: import java.util.concurrent.atomic.*; jaroslav@1890: import java.util.*; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * An {@link ExecutorService} that executes each submitted task using jaroslav@1890: * one of possibly several pooled threads, normally configured jaroslav@1890: * using {@link Executors} factory methods. jaroslav@1890: * jaroslav@1890: *
Thread pools address two different problems: they usually jaroslav@1890: * provide improved performance when executing large numbers of jaroslav@1890: * asynchronous tasks, due to reduced per-task invocation overhead, jaroslav@1890: * and they provide a means of bounding and managing the resources, jaroslav@1890: * including threads, consumed when executing a collection of tasks. jaroslav@1890: * Each {@code ThreadPoolExecutor} also maintains some basic jaroslav@1890: * statistics, such as the number of completed tasks. jaroslav@1890: * jaroslav@1890: *
To be useful across a wide range of contexts, this class jaroslav@1890: * provides many adjustable parameters and extensibility jaroslav@1890: * hooks. However, programmers are urged to use the more convenient jaroslav@1890: * {@link Executors} factory methods {@link jaroslav@1890: * Executors#newCachedThreadPool} (unbounded thread pool, with jaroslav@1890: * automatic thread reclamation), {@link Executors#newFixedThreadPool} jaroslav@1890: * (fixed size thread pool) and {@link jaroslav@1890: * Executors#newSingleThreadExecutor} (single background thread), that jaroslav@1890: * preconfigure settings for the most common usage jaroslav@1890: * scenarios. Otherwise, use the following guide when manually jaroslav@1890: * configuring and tuning this class: jaroslav@1890: * jaroslav@1890: *
If hook or callback methods throw exceptions, internal worker jaroslav@1890: * threads may in turn fail and abruptly terminate.
Extension example. Most extensions of this class jaroslav@1890: * override one or more of the protected hook methods. For example, jaroslav@1890: * here is a subclass that adds a simple pause/resume feature: jaroslav@1890: * jaroslav@1890: *
{@code jaroslav@1890: * class PausableThreadPoolExecutor extends ThreadPoolExecutor { jaroslav@1890: * private boolean isPaused; jaroslav@1890: * private ReentrantLock pauseLock = new ReentrantLock(); jaroslav@1890: * private Condition unpaused = pauseLock.newCondition(); jaroslav@1890: * jaroslav@1890: * public PausableThreadPoolExecutor(...) { super(...); } jaroslav@1890: * jaroslav@1890: * protected void beforeExecute(Thread t, Runnable r) { jaroslav@1890: * super.beforeExecute(t, r); jaroslav@1890: * pauseLock.lock(); jaroslav@1890: * try { jaroslav@1890: * while (isPaused) unpaused.await(); jaroslav@1890: * } catch (InterruptedException ie) { jaroslav@1890: * t.interrupt(); jaroslav@1890: * } finally { jaroslav@1890: * pauseLock.unlock(); jaroslav@1890: * } jaroslav@1890: * } jaroslav@1890: * jaroslav@1890: * public void pause() { jaroslav@1890: * pauseLock.lock(); jaroslav@1890: * try { jaroslav@1890: * isPaused = true; jaroslav@1890: * } finally { jaroslav@1890: * pauseLock.unlock(); jaroslav@1890: * } jaroslav@1890: * } jaroslav@1890: * jaroslav@1890: * public void resume() { jaroslav@1890: * pauseLock.lock(); jaroslav@1890: * try { jaroslav@1890: * isPaused = false; jaroslav@1890: * unpaused.signalAll(); jaroslav@1890: * } finally { jaroslav@1890: * pauseLock.unlock(); jaroslav@1890: * } jaroslav@1890: * } jaroslav@1890: * }}jaroslav@1890: * jaroslav@1890: * @since 1.5 jaroslav@1890: * @author Doug Lea jaroslav@1890: */ jaroslav@1890: public class ThreadPoolExecutor extends AbstractExecutorService { jaroslav@1890: /** jaroslav@1890: * The main pool control state, ctl, is an atomic integer packing jaroslav@1890: * two conceptual fields jaroslav@1890: * workerCount, indicating the effective number of threads jaroslav@1890: * runState, indicating whether running, shutting down etc jaroslav@1890: * jaroslav@1890: * In order to pack them into one int, we limit workerCount to jaroslav@1890: * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 jaroslav@1890: * billion) otherwise representable. If this is ever an issue in jaroslav@1890: * the future, the variable can be changed to be an AtomicLong, jaroslav@1890: * and the shift/mask constants below adjusted. But until the need jaroslav@1890: * arises, this code is a bit faster and simpler using an int. jaroslav@1890: * jaroslav@1890: * The workerCount is the number of workers that have been jaroslav@1890: * permitted to start and not permitted to stop. The value may be jaroslav@1890: * transiently different from the actual number of live threads, jaroslav@1890: * for example when a ThreadFactory fails to create a thread when jaroslav@1890: * asked, and when exiting threads are still performing jaroslav@1890: * bookkeeping before terminating. The user-visible pool size is jaroslav@1890: * reported as the current size of the workers set. jaroslav@1890: * jaroslav@1890: * The runState provides the main lifecyle control, taking on values: jaroslav@1890: * jaroslav@1890: * RUNNING: Accept new tasks and process queued tasks jaroslav@1890: * SHUTDOWN: Don't accept new tasks, but process queued tasks jaroslav@1890: * STOP: Don't accept new tasks, don't process queued tasks, jaroslav@1890: * and interrupt in-progress tasks jaroslav@1890: * TIDYING: All tasks have terminated, workerCount is zero, jaroslav@1890: * the thread transitioning to state TIDYING jaroslav@1890: * will run the terminated() hook method jaroslav@1890: * TERMINATED: terminated() has completed jaroslav@1890: * jaroslav@1890: * The numerical order among these values matters, to allow jaroslav@1890: * ordered comparisons. The runState monotonically increases over jaroslav@1890: * time, but need not hit each state. The transitions are: jaroslav@1890: * jaroslav@1890: * RUNNING -> SHUTDOWN jaroslav@1890: * On invocation of shutdown(), perhaps implicitly in finalize() jaroslav@1890: * (RUNNING or SHUTDOWN) -> STOP jaroslav@1890: * On invocation of shutdownNow() jaroslav@1890: * SHUTDOWN -> TIDYING jaroslav@1890: * When both queue and pool are empty jaroslav@1890: * STOP -> TIDYING jaroslav@1890: * When pool is empty jaroslav@1890: * TIDYING -> TERMINATED jaroslav@1890: * When the terminated() hook method has completed jaroslav@1890: * jaroslav@1890: * Threads waiting in awaitTermination() will return when the jaroslav@1890: * state reaches TERMINATED. jaroslav@1890: * jaroslav@1890: * Detecting the transition from SHUTDOWN to TIDYING is less jaroslav@1890: * straightforward than you'd like because the queue may become jaroslav@1890: * empty after non-empty and vice versa during SHUTDOWN state, but jaroslav@1890: * we can only terminate if, after seeing that it is empty, we see jaroslav@1890: * that workerCount is 0 (which sometimes entails a recheck -- see jaroslav@1890: * below). jaroslav@1890: */ jaroslav@1890: private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); jaroslav@1890: private static final int COUNT_BITS = Integer.SIZE - 3; jaroslav@1890: private static final int CAPACITY = (1 << COUNT_BITS) - 1; jaroslav@1890: jaroslav@1890: // runState is stored in the high-order bits jaroslav@1890: private static final int RUNNING = -1 << COUNT_BITS; jaroslav@1890: private static final int SHUTDOWN = 0 << COUNT_BITS; jaroslav@1890: private static final int STOP = 1 << COUNT_BITS; jaroslav@1890: private static final int TIDYING = 2 << COUNT_BITS; jaroslav@1890: private static final int TERMINATED = 3 << COUNT_BITS; jaroslav@1890: jaroslav@1890: // Packing and unpacking ctl jaroslav@1890: private static int runStateOf(int c) { return c & ~CAPACITY; } jaroslav@1890: private static int workerCountOf(int c) { return c & CAPACITY; } jaroslav@1890: private static int ctlOf(int rs, int wc) { return rs | wc; } jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * Bit field accessors that don't require unpacking ctl. jaroslav@1890: * These depend on the bit layout and on workerCount being never negative. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: private static boolean runStateLessThan(int c, int s) { jaroslav@1890: return c < s; jaroslav@1890: } jaroslav@1890: jaroslav@1890: private static boolean runStateAtLeast(int c, int s) { jaroslav@1890: return c >= s; jaroslav@1890: } jaroslav@1890: jaroslav@1890: private static boolean isRunning(int c) { jaroslav@1890: return c < SHUTDOWN; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempt to CAS-increment the workerCount field of ctl. jaroslav@1890: */ jaroslav@1890: private boolean compareAndIncrementWorkerCount(int expect) { jaroslav@1890: return ctl.compareAndSet(expect, expect + 1); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempt to CAS-decrement the workerCount field of ctl. jaroslav@1890: */ jaroslav@1890: private boolean compareAndDecrementWorkerCount(int expect) { jaroslav@1890: return ctl.compareAndSet(expect, expect - 1); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Decrements the workerCount field of ctl. This is called only on jaroslav@1890: * abrupt termination of a thread (see processWorkerExit). Other jaroslav@1890: * decrements are performed within getTask. jaroslav@1890: */ jaroslav@1890: private void decrementWorkerCount() { jaroslav@1890: do {} while (! compareAndDecrementWorkerCount(ctl.get())); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * The queue used for holding tasks and handing off to worker jaroslav@1890: * threads. We do not require that workQueue.poll() returning jaroslav@1890: * null necessarily means that workQueue.isEmpty(), so rely jaroslav@1890: * solely on isEmpty to see if the queue is empty (which we must jaroslav@1890: * do for example when deciding whether to transition from jaroslav@1890: * SHUTDOWN to TIDYING). This accommodates special-purpose jaroslav@1890: * queues such as DelayQueues for which poll() is allowed to jaroslav@1890: * return null even if it may later return non-null when delays jaroslav@1890: * expire. jaroslav@1890: */ jaroslav@1890: private final BlockingQueue
This method does not wait for previously submitted tasks to jaroslav@1890: * complete execution. Use {@link #awaitTermination awaitTermination} jaroslav@1890: * to do that. jaroslav@1890: * jaroslav@1890: * @throws SecurityException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public void shutdown() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: checkShutdownAccess(); jaroslav@1890: advanceRunState(SHUTDOWN); jaroslav@1890: interruptIdleWorkers(); jaroslav@1890: onShutdown(); // hook for ScheduledThreadPoolExecutor jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: tryTerminate(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempts to stop all actively executing tasks, halts the jaroslav@1890: * processing of waiting tasks, and returns a list of the tasks jaroslav@1890: * that were awaiting execution. These tasks are drained (removed) jaroslav@1890: * from the task queue upon return from this method. jaroslav@1890: * jaroslav@1890: *
This method does not wait for actively executing tasks to jaroslav@1890: * terminate. Use {@link #awaitTermination awaitTermination} to jaroslav@1890: * do that. jaroslav@1890: * jaroslav@1890: *
There are no guarantees beyond best-effort attempts to stop
jaroslav@1890: * processing actively executing tasks. This implementation
jaroslav@1890: * cancels tasks via {@link Thread#interrupt}, so any task that
jaroslav@1890: * fails to respond to interrupts may never terminate.
jaroslav@1890: *
jaroslav@1890: * @throws SecurityException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public List This method may be useful as one part of a cancellation
jaroslav@1890: * scheme. It may fail to remove tasks that have been converted
jaroslav@1890: * into other forms before being placed on the internal queue. For
jaroslav@1890: * example, a task entered using {@code submit} might be
jaroslav@1890: * converted into a form that maintains {@code Future} status.
jaroslav@1890: * However, in such cases, method {@link #purge} may be used to
jaroslav@1890: * remove those Futures that have been cancelled.
jaroslav@1890: *
jaroslav@1890: * @param task the task to remove
jaroslav@1890: * @return true if the task was removed
jaroslav@1890: */
jaroslav@1890: public boolean remove(Runnable task) {
jaroslav@1890: boolean removed = workQueue.remove(task);
jaroslav@1890: tryTerminate(); // In case SHUTDOWN and now empty
jaroslav@1890: return removed;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Tries to remove from the work queue all {@link Future}
jaroslav@1890: * tasks that have been cancelled. This method can be useful as a
jaroslav@1890: * storage reclamation operation, that has no other impact on
jaroslav@1890: * functionality. Cancelled tasks are never executed, but may
jaroslav@1890: * accumulate in work queues until worker threads can actively
jaroslav@1890: * remove them. Invoking this method instead tries to remove them now.
jaroslav@1890: * However, this method may fail to remove tasks in
jaroslav@1890: * the presence of interference by other threads.
jaroslav@1890: */
jaroslav@1890: public void purge() {
jaroslav@1890: final BlockingQueue This implementation does nothing, but may be customized in
jaroslav@1890: * subclasses. Note: To properly nest multiple overridings, subclasses
jaroslav@1890: * should generally invoke {@code super.beforeExecute} at the end of
jaroslav@1890: * this method.
jaroslav@1890: *
jaroslav@1890: * @param t the thread that will run task {@code r}
jaroslav@1890: * @param r the task that will be executed
jaroslav@1890: */
jaroslav@1890: protected void beforeExecute(Thread t, Runnable r) { }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Method invoked upon completion of execution of the given Runnable.
jaroslav@1890: * This method is invoked by the thread that executed the task. If
jaroslav@1890: * non-null, the Throwable is the uncaught {@code RuntimeException}
jaroslav@1890: * or {@code Error} that caused execution to terminate abruptly.
jaroslav@1890: *
jaroslav@1890: * This implementation does nothing, but may be customized in
jaroslav@1890: * subclasses. Note: To properly nest multiple overridings, subclasses
jaroslav@1890: * should generally invoke {@code super.afterExecute} at the
jaroslav@1890: * beginning of this method.
jaroslav@1890: *
jaroslav@1890: * Note: When actions are enclosed in tasks (such as
jaroslav@1890: * {@link FutureTask}) either explicitly or via methods such as
jaroslav@1890: * {@code submit}, these task objects catch and maintain
jaroslav@1890: * computational exceptions, and so they do not cause abrupt
jaroslav@1890: * termination, and the internal exceptions are not
jaroslav@1890: * passed to this method. If you would like to trap both kinds of
jaroslav@1890: * failures in this method, you can further probe for such cases,
jaroslav@1890: * as in this sample subclass that prints either the direct cause
jaroslav@1890: * or the underlying exception if a task has been aborted:
jaroslav@1890: *
jaroslav@1890: * {@code
jaroslav@1890: * class ExtendedExecutor extends ThreadPoolExecutor {
jaroslav@1890: * // ...
jaroslav@1890: * protected void afterExecute(Runnable r, Throwable t) {
jaroslav@1890: * super.afterExecute(r, t);
jaroslav@1890: * if (t == null && r instanceof Future>) {
jaroslav@1890: * try {
jaroslav@1890: * Object result = ((Future>) r).get();
jaroslav@1890: * } catch (CancellationException ce) {
jaroslav@1890: * t = ce;
jaroslav@1890: * } catch (ExecutionException ee) {
jaroslav@1890: * t = ee.getCause();
jaroslav@1890: * } catch (InterruptedException ie) {
jaroslav@1890: * Thread.currentThread().interrupt(); // ignore/reset
jaroslav@1890: * }
jaroslav@1890: * }
jaroslav@1890: * if (t != null)
jaroslav@1890: * System.out.println(t);
jaroslav@1890: * }
jaroslav@1890: * }}
jaroslav@1890: *
jaroslav@1890: * @param r the runnable that has completed
jaroslav@1890: * @param t the exception that caused termination, or null if
jaroslav@1890: * execution completed normally
jaroslav@1890: */
jaroslav@1890: protected void afterExecute(Runnable r, Throwable t) { }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Method invoked when the Executor has terminated. Default
jaroslav@1890: * implementation does nothing. Note: To properly nest multiple
jaroslav@1890: * overridings, subclasses should generally invoke
jaroslav@1890: * {@code super.terminated} within this method.
jaroslav@1890: */
jaroslav@1890: protected void terminated() { }
jaroslav@1890:
jaroslav@1890: /* Predefined RejectedExecutionHandlers */
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * A handler for rejected tasks that runs the rejected task
jaroslav@1890: * directly in the calling thread of the {@code execute} method,
jaroslav@1890: * unless the executor has been shut down, in which case the task
jaroslav@1890: * is discarded.
jaroslav@1890: */
jaroslav@1890: public static class CallerRunsPolicy implements RejectedExecutionHandler {
jaroslav@1890: /**
jaroslav@1890: * Creates a {@code CallerRunsPolicy}.
jaroslav@1890: */
jaroslav@1890: public CallerRunsPolicy() { }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Executes task r in the caller's thread, unless the executor
jaroslav@1890: * has been shut down, in which case the task is discarded.
jaroslav@1890: *
jaroslav@1890: * @param r the runnable task requested to be executed
jaroslav@1890: * @param e the executor attempting to execute this task
jaroslav@1890: */
jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
jaroslav@1890: if (!e.isShutdown()) {
jaroslav@1890: r.run();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * A handler for rejected tasks that throws a
jaroslav@1890: * {@code RejectedExecutionException}.
jaroslav@1890: */
jaroslav@1890: public static class AbortPolicy implements RejectedExecutionHandler {
jaroslav@1890: /**
jaroslav@1890: * Creates an {@code AbortPolicy}.
jaroslav@1890: */
jaroslav@1890: public AbortPolicy() { }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Always throws RejectedExecutionException.
jaroslav@1890: *
jaroslav@1890: * @param r the runnable task requested to be executed
jaroslav@1890: * @param e the executor attempting to execute this task
jaroslav@1890: * @throws RejectedExecutionException always.
jaroslav@1890: */
jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
jaroslav@1890: throw new RejectedExecutionException("Task " + r.toString() +
jaroslav@1890: " rejected from " +
jaroslav@1890: e.toString());
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * A handler for rejected tasks that silently discards the
jaroslav@1890: * rejected task.
jaroslav@1890: */
jaroslav@1890: public static class DiscardPolicy implements RejectedExecutionHandler {
jaroslav@1890: /**
jaroslav@1890: * Creates a {@code DiscardPolicy}.
jaroslav@1890: */
jaroslav@1890: public DiscardPolicy() { }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Does nothing, which has the effect of discarding task r.
jaroslav@1890: *
jaroslav@1890: * @param r the runnable task requested to be executed
jaroslav@1890: * @param e the executor attempting to execute this task
jaroslav@1890: */
jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * A handler for rejected tasks that discards the oldest unhandled
jaroslav@1890: * request and then retries {@code execute}, unless the executor
jaroslav@1890: * is shut down, in which case the task is discarded.
jaroslav@1890: */
jaroslav@1890: public static class DiscardOldestPolicy implements RejectedExecutionHandler {
jaroslav@1890: /**
jaroslav@1890: * Creates a {@code DiscardOldestPolicy} for the given executor.
jaroslav@1890: */
jaroslav@1890: public DiscardOldestPolicy() { }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Obtains and ignores the next task that the executor
jaroslav@1890: * would otherwise execute, if one is immediately available,
jaroslav@1890: * and then retries execution of task r, unless the executor
jaroslav@1890: * is shut down, in which case task r is instead discarded.
jaroslav@1890: *
jaroslav@1890: * @param r the runnable task requested to be executed
jaroslav@1890: * @param e the executor attempting to execute this task
jaroslav@1890: */
jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
jaroslav@1890: if (!e.isShutdown()) {
jaroslav@1890: e.getQueue().poll();
jaroslav@1890: e.execute(r);
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }