jaroslav@1890: /* jaroslav@1890: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. jaroslav@1890: * jaroslav@1890: * This code is free software; you can redistribute it and/or modify it jaroslav@1890: * under the terms of the GNU General Public License version 2 only, as jaroslav@1890: * published by the Free Software Foundation. Oracle designates this jaroslav@1890: * particular file as subject to the "Classpath" exception as provided jaroslav@1890: * by Oracle in the LICENSE file that accompanied this code. jaroslav@1890: * jaroslav@1890: * This code is distributed in the hope that it will be useful, but WITHOUT jaroslav@1890: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or jaroslav@1890: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License jaroslav@1890: * version 2 for more details (a copy is included in the LICENSE file that jaroslav@1890: * accompanied this code). jaroslav@1890: * jaroslav@1890: * You should have received a copy of the GNU General Public License version jaroslav@1890: * 2 along with this work; if not, write to the Free Software Foundation, jaroslav@1890: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. jaroslav@1890: * jaroslav@1890: * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA jaroslav@1890: * or visit www.oracle.com if you need additional information or have any jaroslav@1890: * questions. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * This file is available under and governed by the GNU General Public jaroslav@1890: * License version 2 only, as published by the Free Software Foundation. jaroslav@1890: * However, the following notice accompanied the original version of this jaroslav@1890: * file: jaroslav@1890: * jaroslav@1890: * Written by Doug Lea with assistance from members of JCP JSR-166 jaroslav@1890: * Expert Group and released to the public domain, as explained at jaroslav@1890: * http://creativecommons.org/publicdomain/zero/1.0/ jaroslav@1890: */ jaroslav@1890: jaroslav@1890: package java.util.concurrent; jaroslav@1890: import java.util.concurrent.locks.*; jaroslav@1890: import java.util.concurrent.atomic.*; jaroslav@1890: import java.util.*; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * An {@link ExecutorService} that executes each submitted task using jaroslav@1890: * one of possibly several pooled threads, normally configured jaroslav@1890: * using {@link Executors} factory methods. jaroslav@1890: * jaroslav@1890: *

Thread pools address two different problems: they usually jaroslav@1890: * provide improved performance when executing large numbers of jaroslav@1890: * asynchronous tasks, due to reduced per-task invocation overhead, jaroslav@1890: * and they provide a means of bounding and managing the resources, jaroslav@1890: * including threads, consumed when executing a collection of tasks. jaroslav@1890: * Each {@code ThreadPoolExecutor} also maintains some basic jaroslav@1890: * statistics, such as the number of completed tasks. jaroslav@1890: * jaroslav@1890: *

To be useful across a wide range of contexts, this class jaroslav@1890: * provides many adjustable parameters and extensibility jaroslav@1890: * hooks. However, programmers are urged to use the more convenient jaroslav@1890: * {@link Executors} factory methods {@link jaroslav@1890: * Executors#newCachedThreadPool} (unbounded thread pool, with jaroslav@1890: * automatic thread reclamation), {@link Executors#newFixedThreadPool} jaroslav@1890: * (fixed size thread pool) and {@link jaroslav@1890: * Executors#newSingleThreadExecutor} (single background thread), that jaroslav@1890: * preconfigure settings for the most common usage jaroslav@1890: * scenarios. Otherwise, use the following guide when manually jaroslav@1890: * configuring and tuning this class: jaroslav@1890: * jaroslav@1890: *

jaroslav@1890: * jaroslav@1890: *
Core and maximum pool sizes
jaroslav@1890: * jaroslav@1890: *
A {@code ThreadPoolExecutor} will automatically adjust the jaroslav@1890: * pool size (see {@link #getPoolSize}) jaroslav@1890: * according to the bounds set by jaroslav@1890: * corePoolSize (see {@link #getCorePoolSize}) and jaroslav@1890: * maximumPoolSize (see {@link #getMaximumPoolSize}). jaroslav@1890: * jaroslav@1890: * When a new task is submitted in method {@link #execute}, and fewer jaroslav@1890: * than corePoolSize threads are running, a new thread is created to jaroslav@1890: * handle the request, even if other worker threads are idle. If jaroslav@1890: * there are more than corePoolSize but less than maximumPoolSize jaroslav@1890: * threads running, a new thread will be created only if the queue is jaroslav@1890: * full. By setting corePoolSize and maximumPoolSize the same, you jaroslav@1890: * create a fixed-size thread pool. By setting maximumPoolSize to an jaroslav@1890: * essentially unbounded value such as {@code Integer.MAX_VALUE}, you jaroslav@1890: * allow the pool to accommodate an arbitrary number of concurrent jaroslav@1890: * tasks. Most typically, core and maximum pool sizes are set only jaroslav@1890: * upon construction, but they may also be changed dynamically using jaroslav@1890: * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}.
jaroslav@1890: * jaroslav@1890: *
On-demand construction
jaroslav@1890: * jaroslav@1890: *
By default, even core threads are initially created and jaroslav@1890: * started only when new tasks arrive, but this can be overridden jaroslav@1890: * dynamically using method {@link #prestartCoreThread} or {@link jaroslav@1890: * #prestartAllCoreThreads}. You probably want to prestart threads if jaroslav@1890: * you construct the pool with a non-empty queue.
jaroslav@1890: * jaroslav@1890: *
Creating new threads
jaroslav@1890: * jaroslav@1890: *
New threads are created using a {@link ThreadFactory}. If not jaroslav@1890: * otherwise specified, a {@link Executors#defaultThreadFactory} is jaroslav@1890: * used, that creates threads to all be in the same {@link jaroslav@1890: * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and jaroslav@1890: * non-daemon status. By supplying a different ThreadFactory, you can jaroslav@1890: * alter the thread's name, thread group, priority, daemon status, jaroslav@1890: * etc. If a {@code ThreadFactory} fails to create a thread when asked jaroslav@1890: * by returning null from {@code newThread}, the executor will jaroslav@1890: * continue, but might not be able to execute any tasks. Threads jaroslav@1890: * should possess the "modifyThread" {@code RuntimePermission}. If jaroslav@1890: * worker threads or other threads using the pool do not possess this jaroslav@1890: * permission, service may be degraded: configuration changes may not jaroslav@1890: * take effect in a timely manner, and a shutdown pool may remain in a jaroslav@1890: * state in which termination is possible but not completed.
jaroslav@1890: * jaroslav@1890: *
Keep-alive times
jaroslav@1890: * jaroslav@1890: *
If the pool currently has more than corePoolSize threads, jaroslav@1890: * excess threads will be terminated if they have been idle for more jaroslav@1890: * than the keepAliveTime (see {@link #getKeepAliveTime}). This jaroslav@1890: * provides a means of reducing resource consumption when the pool is jaroslav@1890: * not being actively used. If the pool becomes more active later, new jaroslav@1890: * threads will be constructed. This parameter can also be changed jaroslav@1890: * dynamically using method {@link #setKeepAliveTime}. Using a value jaroslav@1890: * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively jaroslav@1890: * disables idle threads from ever terminating prior to shut down. By jaroslav@1890: * default, the keep-alive policy applies only when there are more jaroslav@1890: * than corePoolSizeThreads. But method {@link jaroslav@1890: * #allowCoreThreadTimeOut(boolean)} can be used to apply this jaroslav@1890: * time-out policy to core threads as well, so long as the jaroslav@1890: * keepAliveTime value is non-zero.
jaroslav@1890: * jaroslav@1890: *
Queuing
jaroslav@1890: * jaroslav@1890: *
Any {@link BlockingQueue} may be used to transfer and hold jaroslav@1890: * submitted tasks. The use of this queue interacts with pool sizing: jaroslav@1890: * jaroslav@1890: * jaroslav@1890: * jaroslav@1890: * There are three general strategies for queuing: jaroslav@1890: *
    jaroslav@1890: * jaroslav@1890: *
  1. Direct handoffs. A good default choice for a work jaroslav@1890: * queue is a {@link SynchronousQueue} that hands off tasks to threads jaroslav@1890: * without otherwise holding them. Here, an attempt to queue a task jaroslav@1890: * will fail if no threads are immediately available to run it, so a jaroslav@1890: * new thread will be constructed. This policy avoids lockups when jaroslav@1890: * handling sets of requests that might have internal dependencies. jaroslav@1890: * Direct handoffs generally require unbounded maximumPoolSizes to jaroslav@1890: * avoid rejection of new submitted tasks. This in turn admits the jaroslav@1890: * possibility of unbounded thread growth when commands continue to jaroslav@1890: * arrive on average faster than they can be processed.
  2. jaroslav@1890: * jaroslav@1890: *
  3. Unbounded queues. Using an unbounded queue (for jaroslav@1890: * example a {@link LinkedBlockingQueue} without a predefined jaroslav@1890: * capacity) will cause new tasks to wait in the queue when all jaroslav@1890: * corePoolSize threads are busy. Thus, no more than corePoolSize jaroslav@1890: * threads will ever be created. (And the value of the maximumPoolSize jaroslav@1890: * therefore doesn't have any effect.) This may be appropriate when jaroslav@1890: * each task is completely independent of others, so tasks cannot jaroslav@1890: * affect each others execution; for example, in a web page server. jaroslav@1890: * While this style of queuing can be useful in smoothing out jaroslav@1890: * transient bursts of requests, it admits the possibility of jaroslav@1890: * unbounded work queue growth when commands continue to arrive on jaroslav@1890: * average faster than they can be processed.
  4. jaroslav@1890: * jaroslav@1890: *
  5. Bounded queues. A bounded queue (for example, an jaroslav@1890: * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when jaroslav@1890: * used with finite maximumPoolSizes, but can be more difficult to jaroslav@1890: * tune and control. Queue sizes and maximum pool sizes may be traded jaroslav@1890: * off for each other: Using large queues and small pools minimizes jaroslav@1890: * CPU usage, OS resources, and context-switching overhead, but can jaroslav@1890: * lead to artificially low throughput. If tasks frequently block (for jaroslav@1890: * example if they are I/O bound), a system may be able to schedule jaroslav@1890: * time for more threads than you otherwise allow. Use of small queues jaroslav@1890: * generally requires larger pool sizes, which keeps CPUs busier but jaroslav@1890: * may encounter unacceptable scheduling overhead, which also jaroslav@1890: * decreases throughput.
  6. jaroslav@1890: * jaroslav@1890: *
jaroslav@1890: * jaroslav@1890: *
jaroslav@1890: * jaroslav@1890: *
Rejected tasks
jaroslav@1890: * jaroslav@1890: *
New tasks submitted in method {@link #execute} will be jaroslav@1890: * rejected when the Executor has been shut down, and also jaroslav@1890: * when the Executor uses finite bounds for both maximum threads and jaroslav@1890: * work queue capacity, and is saturated. In either case, the {@code jaroslav@1890: * execute} method invokes the {@link jaroslav@1890: * RejectedExecutionHandler#rejectedExecution} method of its {@link jaroslav@1890: * RejectedExecutionHandler}. Four predefined handler policies are jaroslav@1890: * provided: jaroslav@1890: * jaroslav@1890: *
    jaroslav@1890: * jaroslav@1890: *
  1. In the default {@link ThreadPoolExecutor.AbortPolicy}, the jaroslav@1890: * handler throws a runtime {@link RejectedExecutionException} upon jaroslav@1890: * rejection.
  2. jaroslav@1890: * jaroslav@1890: *
  3. In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread jaroslav@1890: * that invokes {@code execute} itself runs the task. This provides a jaroslav@1890: * simple feedback control mechanism that will slow down the rate that jaroslav@1890: * new tasks are submitted.
  4. jaroslav@1890: * jaroslav@1890: *
  5. In {@link ThreadPoolExecutor.DiscardPolicy}, a task that jaroslav@1890: * cannot be executed is simply dropped.
  6. jaroslav@1890: * jaroslav@1890: *
  7. In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the jaroslav@1890: * executor is not shut down, the task at the head of the work queue jaroslav@1890: * is dropped, and then execution is retried (which can fail again, jaroslav@1890: * causing this to be repeated.)
  8. jaroslav@1890: * jaroslav@1890: *
jaroslav@1890: * jaroslav@1890: * It is possible to define and use other kinds of {@link jaroslav@1890: * RejectedExecutionHandler} classes. Doing so requires some care jaroslav@1890: * especially when policies are designed to work only under particular jaroslav@1890: * capacity or queuing policies.
jaroslav@1890: * jaroslav@1890: *
Hook methods
jaroslav@1890: * jaroslav@1890: *
This class provides {@code protected} overridable {@link jaroslav@1890: * #beforeExecute} and {@link #afterExecute} methods that are called jaroslav@1890: * before and after execution of each task. These can be used to jaroslav@1890: * manipulate the execution environment; for example, reinitializing jaroslav@1890: * ThreadLocals, gathering statistics, or adding log jaroslav@1890: * entries. Additionally, method {@link #terminated} can be overridden jaroslav@1890: * to perform any special processing that needs to be done once the jaroslav@1890: * Executor has fully terminated. jaroslav@1890: * jaroslav@1890: *

If hook or callback methods throw exceptions, internal worker jaroslav@1890: * threads may in turn fail and abruptly terminate.

jaroslav@1890: * jaroslav@1890: *
Queue maintenance
jaroslav@1890: * jaroslav@1890: *
Method {@link #getQueue} allows access to the work queue for jaroslav@1890: * purposes of monitoring and debugging. Use of this method for any jaroslav@1890: * other purpose is strongly discouraged. Two supplied methods, jaroslav@1890: * {@link #remove} and {@link #purge} are available to assist in jaroslav@1890: * storage reclamation when large numbers of queued tasks become jaroslav@1890: * cancelled.
jaroslav@1890: * jaroslav@1890: *
Finalization
jaroslav@1890: * jaroslav@1890: *
A pool that is no longer referenced in a program AND jaroslav@1890: * has no remaining threads will be {@code shutdown} automatically. If jaroslav@1890: * you would like to ensure that unreferenced pools are reclaimed even jaroslav@1890: * if users forget to call {@link #shutdown}, then you must arrange jaroslav@1890: * that unused threads eventually die, by setting appropriate jaroslav@1890: * keep-alive times, using a lower bound of zero core threads and/or jaroslav@1890: * setting {@link #allowCoreThreadTimeOut(boolean)}.
jaroslav@1890: * jaroslav@1890: *
jaroslav@1890: * jaroslav@1890: *

Extension example. Most extensions of this class jaroslav@1890: * override one or more of the protected hook methods. For example, jaroslav@1890: * here is a subclass that adds a simple pause/resume feature: jaroslav@1890: * jaroslav@1890: *

 {@code
jaroslav@1890:  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
jaroslav@1890:  *   private boolean isPaused;
jaroslav@1890:  *   private ReentrantLock pauseLock = new ReentrantLock();
jaroslav@1890:  *   private Condition unpaused = pauseLock.newCondition();
jaroslav@1890:  *
jaroslav@1890:  *   public PausableThreadPoolExecutor(...) { super(...); }
jaroslav@1890:  *
jaroslav@1890:  *   protected void beforeExecute(Thread t, Runnable r) {
jaroslav@1890:  *     super.beforeExecute(t, r);
jaroslav@1890:  *     pauseLock.lock();
jaroslav@1890:  *     try {
jaroslav@1890:  *       while (isPaused) unpaused.await();
jaroslav@1890:  *     } catch (InterruptedException ie) {
jaroslav@1890:  *       t.interrupt();
jaroslav@1890:  *     } finally {
jaroslav@1890:  *       pauseLock.unlock();
jaroslav@1890:  *     }
jaroslav@1890:  *   }
jaroslav@1890:  *
jaroslav@1890:  *   public void pause() {
jaroslav@1890:  *     pauseLock.lock();
jaroslav@1890:  *     try {
jaroslav@1890:  *       isPaused = true;
jaroslav@1890:  *     } finally {
jaroslav@1890:  *       pauseLock.unlock();
jaroslav@1890:  *     }
jaroslav@1890:  *   }
jaroslav@1890:  *
jaroslav@1890:  *   public void resume() {
jaroslav@1890:  *     pauseLock.lock();
jaroslav@1890:  *     try {
jaroslav@1890:  *       isPaused = false;
jaroslav@1890:  *       unpaused.signalAll();
jaroslav@1890:  *     } finally {
jaroslav@1890:  *       pauseLock.unlock();
jaroslav@1890:  *     }
jaroslav@1890:  *   }
jaroslav@1890:  * }}
jaroslav@1890: * jaroslav@1890: * @since 1.5 jaroslav@1890: * @author Doug Lea jaroslav@1890: */ jaroslav@1890: public class ThreadPoolExecutor extends AbstractExecutorService { jaroslav@1890: /** jaroslav@1890: * The main pool control state, ctl, is an atomic integer packing jaroslav@1890: * two conceptual fields jaroslav@1890: * workerCount, indicating the effective number of threads jaroslav@1890: * runState, indicating whether running, shutting down etc jaroslav@1890: * jaroslav@1890: * In order to pack them into one int, we limit workerCount to jaroslav@1890: * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 jaroslav@1890: * billion) otherwise representable. If this is ever an issue in jaroslav@1890: * the future, the variable can be changed to be an AtomicLong, jaroslav@1890: * and the shift/mask constants below adjusted. But until the need jaroslav@1890: * arises, this code is a bit faster and simpler using an int. jaroslav@1890: * jaroslav@1890: * The workerCount is the number of workers that have been jaroslav@1890: * permitted to start and not permitted to stop. The value may be jaroslav@1890: * transiently different from the actual number of live threads, jaroslav@1890: * for example when a ThreadFactory fails to create a thread when jaroslav@1890: * asked, and when exiting threads are still performing jaroslav@1890: * bookkeeping before terminating. The user-visible pool size is jaroslav@1890: * reported as the current size of the workers set. jaroslav@1890: * jaroslav@1890: * The runState provides the main lifecyle control, taking on values: jaroslav@1890: * jaroslav@1890: * RUNNING: Accept new tasks and process queued tasks jaroslav@1890: * SHUTDOWN: Don't accept new tasks, but process queued tasks jaroslav@1890: * STOP: Don't accept new tasks, don't process queued tasks, jaroslav@1890: * and interrupt in-progress tasks jaroslav@1890: * TIDYING: All tasks have terminated, workerCount is zero, jaroslav@1890: * the thread transitioning to state TIDYING jaroslav@1890: * will run the terminated() hook method jaroslav@1890: * TERMINATED: terminated() has completed jaroslav@1890: * jaroslav@1890: * The numerical order among these values matters, to allow jaroslav@1890: * ordered comparisons. The runState monotonically increases over jaroslav@1890: * time, but need not hit each state. The transitions are: jaroslav@1890: * jaroslav@1890: * RUNNING -> SHUTDOWN jaroslav@1890: * On invocation of shutdown(), perhaps implicitly in finalize() jaroslav@1890: * (RUNNING or SHUTDOWN) -> STOP jaroslav@1890: * On invocation of shutdownNow() jaroslav@1890: * SHUTDOWN -> TIDYING jaroslav@1890: * When both queue and pool are empty jaroslav@1890: * STOP -> TIDYING jaroslav@1890: * When pool is empty jaroslav@1890: * TIDYING -> TERMINATED jaroslav@1890: * When the terminated() hook method has completed jaroslav@1890: * jaroslav@1890: * Threads waiting in awaitTermination() will return when the jaroslav@1890: * state reaches TERMINATED. jaroslav@1890: * jaroslav@1890: * Detecting the transition from SHUTDOWN to TIDYING is less jaroslav@1890: * straightforward than you'd like because the queue may become jaroslav@1890: * empty after non-empty and vice versa during SHUTDOWN state, but jaroslav@1890: * we can only terminate if, after seeing that it is empty, we see jaroslav@1890: * that workerCount is 0 (which sometimes entails a recheck -- see jaroslav@1890: * below). jaroslav@1890: */ jaroslav@1890: private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); jaroslav@1890: private static final int COUNT_BITS = Integer.SIZE - 3; jaroslav@1890: private static final int CAPACITY = (1 << COUNT_BITS) - 1; jaroslav@1890: jaroslav@1890: // runState is stored in the high-order bits jaroslav@1890: private static final int RUNNING = -1 << COUNT_BITS; jaroslav@1890: private static final int SHUTDOWN = 0 << COUNT_BITS; jaroslav@1890: private static final int STOP = 1 << COUNT_BITS; jaroslav@1890: private static final int TIDYING = 2 << COUNT_BITS; jaroslav@1890: private static final int TERMINATED = 3 << COUNT_BITS; jaroslav@1890: jaroslav@1890: // Packing and unpacking ctl jaroslav@1890: private static int runStateOf(int c) { return c & ~CAPACITY; } jaroslav@1890: private static int workerCountOf(int c) { return c & CAPACITY; } jaroslav@1890: private static int ctlOf(int rs, int wc) { return rs | wc; } jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * Bit field accessors that don't require unpacking ctl. jaroslav@1890: * These depend on the bit layout and on workerCount being never negative. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: private static boolean runStateLessThan(int c, int s) { jaroslav@1890: return c < s; jaroslav@1890: } jaroslav@1890: jaroslav@1890: private static boolean runStateAtLeast(int c, int s) { jaroslav@1890: return c >= s; jaroslav@1890: } jaroslav@1890: jaroslav@1890: private static boolean isRunning(int c) { jaroslav@1890: return c < SHUTDOWN; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempt to CAS-increment the workerCount field of ctl. jaroslav@1890: */ jaroslav@1890: private boolean compareAndIncrementWorkerCount(int expect) { jaroslav@1890: return ctl.compareAndSet(expect, expect + 1); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempt to CAS-decrement the workerCount field of ctl. jaroslav@1890: */ jaroslav@1890: private boolean compareAndDecrementWorkerCount(int expect) { jaroslav@1890: return ctl.compareAndSet(expect, expect - 1); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Decrements the workerCount field of ctl. This is called only on jaroslav@1890: * abrupt termination of a thread (see processWorkerExit). Other jaroslav@1890: * decrements are performed within getTask. jaroslav@1890: */ jaroslav@1890: private void decrementWorkerCount() { jaroslav@1890: do {} while (! compareAndDecrementWorkerCount(ctl.get())); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * The queue used for holding tasks and handing off to worker jaroslav@1890: * threads. We do not require that workQueue.poll() returning jaroslav@1890: * null necessarily means that workQueue.isEmpty(), so rely jaroslav@1890: * solely on isEmpty to see if the queue is empty (which we must jaroslav@1890: * do for example when deciding whether to transition from jaroslav@1890: * SHUTDOWN to TIDYING). This accommodates special-purpose jaroslav@1890: * queues such as DelayQueues for which poll() is allowed to jaroslav@1890: * return null even if it may later return non-null when delays jaroslav@1890: * expire. jaroslav@1890: */ jaroslav@1890: private final BlockingQueue workQueue; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Lock held on access to workers set and related bookkeeping. jaroslav@1890: * While we could use a concurrent set of some sort, it turns out jaroslav@1890: * to be generally preferable to use a lock. Among the reasons is jaroslav@1890: * that this serializes interruptIdleWorkers, which avoids jaroslav@1890: * unnecessary interrupt storms, especially during shutdown. jaroslav@1890: * Otherwise exiting threads would concurrently interrupt those jaroslav@1890: * that have not yet interrupted. It also simplifies some of the jaroslav@1890: * associated statistics bookkeeping of largestPoolSize etc. We jaroslav@1890: * also hold mainLock on shutdown and shutdownNow, for the sake of jaroslav@1890: * ensuring workers set is stable while separately checking jaroslav@1890: * permission to interrupt and actually interrupting. jaroslav@1890: */ jaroslav@1890: private final ReentrantLock mainLock = new ReentrantLock(); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Set containing all worker threads in pool. Accessed only when jaroslav@1890: * holding mainLock. jaroslav@1890: */ jaroslav@1890: private final HashSet workers = new HashSet(); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Wait condition to support awaitTermination jaroslav@1890: */ jaroslav@1890: private final Condition termination = mainLock.newCondition(); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Tracks largest attained pool size. Accessed only under jaroslav@1890: * mainLock. jaroslav@1890: */ jaroslav@1890: private int largestPoolSize; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Counter for completed tasks. Updated only on termination of jaroslav@1890: * worker threads. Accessed only under mainLock. jaroslav@1890: */ jaroslav@1890: private long completedTaskCount; jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * All user control parameters are declared as volatiles so that jaroslav@1890: * ongoing actions are based on freshest values, but without need jaroslav@1890: * for locking, since no internal invariants depend on them jaroslav@1890: * changing synchronously with respect to other actions. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Factory for new threads. All threads are created using this jaroslav@1890: * factory (via method addWorker). All callers must be prepared jaroslav@1890: * for addWorker to fail, which may reflect a system or user's jaroslav@1890: * policy limiting the number of threads. Even though it is not jaroslav@1890: * treated as an error, failure to create threads may result in jaroslav@1890: * new tasks being rejected or existing ones remaining stuck in jaroslav@1890: * the queue. On the other hand, no special precautions exist to jaroslav@1890: * handle OutOfMemoryErrors that might be thrown while trying to jaroslav@1890: * create threads, since there is generally no recourse from jaroslav@1890: * within this class. jaroslav@1890: */ jaroslav@1890: private volatile ThreadFactory threadFactory; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Handler called when saturated or shutdown in execute. jaroslav@1890: */ jaroslav@1890: private volatile RejectedExecutionHandler handler; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Timeout in nanoseconds for idle threads waiting for work. jaroslav@1890: * Threads use this timeout when there are more than corePoolSize jaroslav@1890: * present or if allowCoreThreadTimeOut. Otherwise they wait jaroslav@1890: * forever for new work. jaroslav@1890: */ jaroslav@1890: private volatile long keepAliveTime; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * If false (default), core threads stay alive even when idle. jaroslav@1890: * If true, core threads use keepAliveTime to time out waiting jaroslav@1890: * for work. jaroslav@1890: */ jaroslav@1890: private volatile boolean allowCoreThreadTimeOut; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Core pool size is the minimum number of workers to keep alive jaroslav@1890: * (and not allow to time out etc) unless allowCoreThreadTimeOut jaroslav@1890: * is set, in which case the minimum is zero. jaroslav@1890: */ jaroslav@1890: private volatile int corePoolSize; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Maximum pool size. Note that the actual maximum is internally jaroslav@1890: * bounded by CAPACITY. jaroslav@1890: */ jaroslav@1890: private volatile int maximumPoolSize; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * The default rejected execution handler jaroslav@1890: */ jaroslav@1890: private static final RejectedExecutionHandler defaultHandler = jaroslav@1890: new AbortPolicy(); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Permission required for callers of shutdown and shutdownNow. jaroslav@1890: * We additionally require (see checkShutdownAccess) that callers jaroslav@1890: * have permission to actually interrupt threads in the worker set jaroslav@1890: * (as governed by Thread.interrupt, which relies on jaroslav@1890: * ThreadGroup.checkAccess, which in turn relies on jaroslav@1890: * SecurityManager.checkAccess). Shutdowns are attempted only if jaroslav@1890: * these checks pass. jaroslav@1890: * jaroslav@1890: * All actual invocations of Thread.interrupt (see jaroslav@1890: * interruptIdleWorkers and interruptWorkers) ignore jaroslav@1890: * SecurityExceptions, meaning that the attempted interrupts jaroslav@1890: * silently fail. In the case of shutdown, they should not fail jaroslav@1890: * unless the SecurityManager has inconsistent policies, sometimes jaroslav@1890: * allowing access to a thread and sometimes not. In such cases, jaroslav@1890: * failure to actually interrupt threads may disable or delay full jaroslav@1890: * termination. Other uses of interruptIdleWorkers are advisory, jaroslav@1890: * and failure to actually interrupt will merely delay response to jaroslav@1890: * configuration changes so is not handled exceptionally. jaroslav@1890: */ jaroslav@1895: // private static final RuntimePermission shutdownPerm = jaroslav@1895: // new RuntimePermission("modifyThread"); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Class Worker mainly maintains interrupt control state for jaroslav@1890: * threads running tasks, along with other minor bookkeeping. jaroslav@1890: * This class opportunistically extends AbstractQueuedSynchronizer jaroslav@1890: * to simplify acquiring and releasing a lock surrounding each jaroslav@1890: * task execution. This protects against interrupts that are jaroslav@1890: * intended to wake up a worker thread waiting for a task from jaroslav@1890: * instead interrupting a task being run. We implement a simple jaroslav@1890: * non-reentrant mutual exclusion lock rather than use ReentrantLock jaroslav@1890: * because we do not want worker tasks to be able to reacquire the jaroslav@1890: * lock when they invoke pool control methods like setCorePoolSize. jaroslav@1890: */ jaroslav@1890: private final class Worker jaroslav@1890: extends AbstractQueuedSynchronizer jaroslav@1890: implements Runnable jaroslav@1890: { jaroslav@1890: /** jaroslav@1890: * This class will never be serialized, but we provide a jaroslav@1890: * serialVersionUID to suppress a javac warning. jaroslav@1890: */ jaroslav@1890: private static final long serialVersionUID = 6138294804551838833L; jaroslav@1890: jaroslav@1890: /** Thread this worker is running in. Null if factory fails. */ jaroslav@1890: final Thread thread; jaroslav@1890: /** Initial task to run. Possibly null. */ jaroslav@1890: Runnable firstTask; jaroslav@1890: /** Per-thread task counter */ jaroslav@1890: volatile long completedTasks; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates with given first task and thread from ThreadFactory. jaroslav@1890: * @param firstTask the first task (null if none) jaroslav@1890: */ jaroslav@1890: Worker(Runnable firstTask) { jaroslav@1890: this.firstTask = firstTask; jaroslav@1890: this.thread = getThreadFactory().newThread(this); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** Delegates main run loop to outer runWorker */ jaroslav@1890: public void run() { jaroslav@1890: runWorker(this); jaroslav@1890: } jaroslav@1890: jaroslav@1890: // Lock methods jaroslav@1890: // jaroslav@1890: // The value 0 represents the unlocked state. jaroslav@1890: // The value 1 represents the locked state. jaroslav@1890: jaroslav@1890: protected boolean isHeldExclusively() { jaroslav@1890: return getState() == 1; jaroslav@1890: } jaroslav@1890: jaroslav@1890: protected boolean tryAcquire(int unused) { jaroslav@1890: if (compareAndSetState(0, 1)) { jaroslav@1890: setExclusiveOwnerThread(Thread.currentThread()); jaroslav@1890: return true; jaroslav@1890: } jaroslav@1890: return false; jaroslav@1890: } jaroslav@1890: jaroslav@1890: protected boolean tryRelease(int unused) { jaroslav@1890: setExclusiveOwnerThread(null); jaroslav@1890: setState(0); jaroslav@1890: return true; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public void lock() { acquire(1); } jaroslav@1890: public boolean tryLock() { return tryAcquire(1); } jaroslav@1890: public void unlock() { release(1); } jaroslav@1890: public boolean isLocked() { return isHeldExclusively(); } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * Methods for setting control state jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Transitions runState to given target, or leaves it alone if jaroslav@1890: * already at least the given target. jaroslav@1890: * jaroslav@1890: * @param targetState the desired state, either SHUTDOWN or STOP jaroslav@1890: * (but not TIDYING or TERMINATED -- use tryTerminate for that) jaroslav@1890: */ jaroslav@1890: private void advanceRunState(int targetState) { jaroslav@1890: for (;;) { jaroslav@1890: int c = ctl.get(); jaroslav@1890: if (runStateAtLeast(c, targetState) || jaroslav@1890: ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) jaroslav@1890: break; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Transitions to TERMINATED state if either (SHUTDOWN and pool jaroslav@1890: * and queue empty) or (STOP and pool empty). If otherwise jaroslav@1890: * eligible to terminate but workerCount is nonzero, interrupts an jaroslav@1890: * idle worker to ensure that shutdown signals propagate. This jaroslav@1890: * method must be called following any action that might make jaroslav@1890: * termination possible -- reducing worker count or removing tasks jaroslav@1890: * from the queue during shutdown. The method is non-private to jaroslav@1890: * allow access from ScheduledThreadPoolExecutor. jaroslav@1890: */ jaroslav@1890: final void tryTerminate() { jaroslav@1890: for (;;) { jaroslav@1890: int c = ctl.get(); jaroslav@1890: if (isRunning(c) || jaroslav@1890: runStateAtLeast(c, TIDYING) || jaroslav@1890: (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) jaroslav@1890: return; jaroslav@1890: if (workerCountOf(c) != 0) { // Eligible to terminate jaroslav@1890: interruptIdleWorkers(ONLY_ONE); jaroslav@1890: return; jaroslav@1890: } jaroslav@1890: jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { jaroslav@1890: try { jaroslav@1890: terminated(); jaroslav@1890: } finally { jaroslav@1890: ctl.set(ctlOf(TERMINATED, 0)); jaroslav@1890: termination.signalAll(); jaroslav@1890: } jaroslav@1890: return; jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: // else retry on failed CAS jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * Methods for controlling interrupts to worker threads. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * If there is a security manager, makes sure caller has jaroslav@1890: * permission to shut down threads in general (see shutdownPerm). jaroslav@1890: * If this passes, additionally makes sure the caller is allowed jaroslav@1890: * to interrupt each worker thread. This might not be true even if jaroslav@1890: * first check passed, if the SecurityManager treats some threads jaroslav@1890: * specially. jaroslav@1890: */ jaroslav@1890: private void checkShutdownAccess() { jaroslav@1895: // SecurityManager security = System.getSecurityManager(); jaroslav@1895: // if (security != null) { jaroslav@1895: // security.checkPermission(shutdownPerm); jaroslav@1895: // final ReentrantLock mainLock = this.mainLock; jaroslav@1895: // mainLock.lock(); jaroslav@1895: // try { jaroslav@1895: // for (Worker w : workers) jaroslav@1895: // security.checkAccess(w.thread); jaroslav@1895: // } finally { jaroslav@1895: // mainLock.unlock(); jaroslav@1895: // } jaroslav@1895: // } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Interrupts all threads, even if active. Ignores SecurityExceptions jaroslav@1890: * (in which case some threads may remain uninterrupted). jaroslav@1890: */ jaroslav@1890: private void interruptWorkers() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: for (Worker w : workers) { jaroslav@1890: try { jaroslav@1890: w.thread.interrupt(); jaroslav@1890: } catch (SecurityException ignore) { jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Interrupts threads that might be waiting for tasks (as jaroslav@1890: * indicated by not being locked) so they can check for jaroslav@1890: * termination or configuration changes. Ignores jaroslav@1890: * SecurityExceptions (in which case some threads may remain jaroslav@1890: * uninterrupted). jaroslav@1890: * jaroslav@1890: * @param onlyOne If true, interrupt at most one worker. This is jaroslav@1890: * called only from tryTerminate when termination is otherwise jaroslav@1890: * enabled but there are still other workers. In this case, at jaroslav@1890: * most one waiting worker is interrupted to propagate shutdown jaroslav@1890: * signals in case all threads are currently waiting. jaroslav@1890: * Interrupting any arbitrary thread ensures that newly arriving jaroslav@1890: * workers since shutdown began will also eventually exit. jaroslav@1890: * To guarantee eventual termination, it suffices to always jaroslav@1890: * interrupt only one idle worker, but shutdown() interrupts all jaroslav@1890: * idle workers so that redundant workers exit promptly, not jaroslav@1890: * waiting for a straggler task to finish. jaroslav@1890: */ jaroslav@1890: private void interruptIdleWorkers(boolean onlyOne) { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: for (Worker w : workers) { jaroslav@1890: Thread t = w.thread; jaroslav@1890: if (!t.isInterrupted() && w.tryLock()) { jaroslav@1890: try { jaroslav@1890: t.interrupt(); jaroslav@1890: } catch (SecurityException ignore) { jaroslav@1890: } finally { jaroslav@1890: w.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: if (onlyOne) jaroslav@1890: break; jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Common form of interruptIdleWorkers, to avoid having to jaroslav@1890: * remember what the boolean argument means. jaroslav@1890: */ jaroslav@1890: private void interruptIdleWorkers() { jaroslav@1890: interruptIdleWorkers(false); jaroslav@1890: } jaroslav@1890: jaroslav@1890: private static final boolean ONLY_ONE = true; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Ensures that unless the pool is stopping, the current thread jaroslav@1890: * does not have its interrupt set. This requires a double-check jaroslav@1890: * of state in case the interrupt was cleared concurrently with a jaroslav@1890: * shutdownNow -- if so, the interrupt is re-enabled. jaroslav@1890: */ jaroslav@1890: private void clearInterruptsForTaskRun() { jaroslav@1890: if (runStateLessThan(ctl.get(), STOP) && jaroslav@1890: Thread.interrupted() && jaroslav@1890: runStateAtLeast(ctl.get(), STOP)) jaroslav@1890: Thread.currentThread().interrupt(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * Misc utilities, most of which are also exported to jaroslav@1890: * ScheduledThreadPoolExecutor jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Invokes the rejected execution handler for the given command. jaroslav@1890: * Package-protected for use by ScheduledThreadPoolExecutor. jaroslav@1890: */ jaroslav@1890: final void reject(Runnable command) { jaroslav@1890: handler.rejectedExecution(command, this); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Performs any further cleanup following run state transition on jaroslav@1890: * invocation of shutdown. A no-op here, but used by jaroslav@1890: * ScheduledThreadPoolExecutor to cancel delayed tasks. jaroslav@1890: */ jaroslav@1890: void onShutdown() { jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * State check needed by ScheduledThreadPoolExecutor to jaroslav@1890: * enable running tasks during shutdown. jaroslav@1890: * jaroslav@1890: * @param shutdownOK true if should return true if SHUTDOWN jaroslav@1890: */ jaroslav@1890: final boolean isRunningOrShutdown(boolean shutdownOK) { jaroslav@1890: int rs = runStateOf(ctl.get()); jaroslav@1890: return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Drains the task queue into a new list, normally using jaroslav@1890: * drainTo. But if the queue is a DelayQueue or any other kind of jaroslav@1890: * queue for which poll or drainTo may fail to remove some jaroslav@1890: * elements, it deletes them one by one. jaroslav@1890: */ jaroslav@1890: private List drainQueue() { jaroslav@1890: BlockingQueue q = workQueue; jaroslav@1890: List taskList = new ArrayList(); jaroslav@1890: q.drainTo(taskList); jaroslav@1890: if (!q.isEmpty()) { jaroslav@1890: for (Runnable r : q.toArray(new Runnable[0])) { jaroslav@1890: if (q.remove(r)) jaroslav@1890: taskList.add(r); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: return taskList; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * Methods for creating, running and cleaning up after workers jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Checks if a new worker can be added with respect to current jaroslav@1890: * pool state and the given bound (either core or maximum). If so, jaroslav@1890: * the worker count is adjusted accordingly, and, if possible, a jaroslav@1890: * new worker is created and started running firstTask as its jaroslav@1890: * first task. This method returns false if the pool is stopped or jaroslav@1890: * eligible to shut down. It also returns false if the thread jaroslav@1890: * factory fails to create a thread when asked, which requires a jaroslav@1890: * backout of workerCount, and a recheck for termination, in case jaroslav@1890: * the existence of this worker was holding up termination. jaroslav@1890: * jaroslav@1890: * @param firstTask the task the new thread should run first (or jaroslav@1890: * null if none). Workers are created with an initial first task jaroslav@1890: * (in method execute()) to bypass queuing when there are fewer jaroslav@1890: * than corePoolSize threads (in which case we always start one), jaroslav@1890: * or when the queue is full (in which case we must bypass queue). jaroslav@1890: * Initially idle threads are usually created via jaroslav@1890: * prestartCoreThread or to replace other dying workers. jaroslav@1890: * jaroslav@1890: * @param core if true use corePoolSize as bound, else jaroslav@1890: * maximumPoolSize. (A boolean indicator is used here rather than a jaroslav@1890: * value to ensure reads of fresh values after checking other pool jaroslav@1890: * state). jaroslav@1890: * @return true if successful jaroslav@1890: */ jaroslav@1890: private boolean addWorker(Runnable firstTask, boolean core) { jaroslav@1890: retry: jaroslav@1890: for (;;) { jaroslav@1890: int c = ctl.get(); jaroslav@1890: int rs = runStateOf(c); jaroslav@1890: jaroslav@1890: // Check if queue empty only if necessary. jaroslav@1890: if (rs >= SHUTDOWN && jaroslav@1890: ! (rs == SHUTDOWN && jaroslav@1890: firstTask == null && jaroslav@1890: ! workQueue.isEmpty())) jaroslav@1890: return false; jaroslav@1890: jaroslav@1890: for (;;) { jaroslav@1890: int wc = workerCountOf(c); jaroslav@1890: if (wc >= CAPACITY || jaroslav@1890: wc >= (core ? corePoolSize : maximumPoolSize)) jaroslav@1890: return false; jaroslav@1890: if (compareAndIncrementWorkerCount(c)) jaroslav@1890: break retry; jaroslav@1890: c = ctl.get(); // Re-read ctl jaroslav@1890: if (runStateOf(c) != rs) jaroslav@1890: continue retry; jaroslav@1890: // else CAS failed due to workerCount change; retry inner loop jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: Worker w = new Worker(firstTask); jaroslav@1890: Thread t = w.thread; jaroslav@1890: jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: // Recheck while holding lock. jaroslav@1890: // Back out on ThreadFactory failure or if jaroslav@1890: // shut down before lock acquired. jaroslav@1890: int c = ctl.get(); jaroslav@1890: int rs = runStateOf(c); jaroslav@1890: jaroslav@1890: if (t == null || jaroslav@1890: (rs >= SHUTDOWN && jaroslav@1890: ! (rs == SHUTDOWN && jaroslav@1890: firstTask == null))) { jaroslav@1890: decrementWorkerCount(); jaroslav@1890: tryTerminate(); jaroslav@1890: return false; jaroslav@1890: } jaroslav@1890: jaroslav@1890: workers.add(w); jaroslav@1890: jaroslav@1890: int s = workers.size(); jaroslav@1890: if (s > largestPoolSize) jaroslav@1890: largestPoolSize = s; jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: t.start(); jaroslav@1890: // It is possible (but unlikely) for a thread to have been jaroslav@1890: // added to workers, but not yet started, during transition to jaroslav@1890: // STOP, which could result in a rare missed interrupt, jaroslav@1890: // because Thread.interrupt is not guaranteed to have any effect jaroslav@1890: // on a non-yet-started Thread (see Thread#interrupt). jaroslav@1890: if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) jaroslav@1890: t.interrupt(); jaroslav@1890: jaroslav@1890: return true; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Performs cleanup and bookkeeping for a dying worker. Called jaroslav@1890: * only from worker threads. Unless completedAbruptly is set, jaroslav@1890: * assumes that workerCount has already been adjusted to account jaroslav@1890: * for exit. This method removes thread from worker set, and jaroslav@1890: * possibly terminates the pool or replaces the worker if either jaroslav@1890: * it exited due to user task exception or if fewer than jaroslav@1890: * corePoolSize workers are running or queue is non-empty but jaroslav@1890: * there are no workers. jaroslav@1890: * jaroslav@1890: * @param w the worker jaroslav@1890: * @param completedAbruptly if the worker died due to user exception jaroslav@1890: */ jaroslav@1890: private void processWorkerExit(Worker w, boolean completedAbruptly) { jaroslav@1890: if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted jaroslav@1890: decrementWorkerCount(); jaroslav@1890: jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: completedTaskCount += w.completedTasks; jaroslav@1890: workers.remove(w); jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: tryTerminate(); jaroslav@1890: jaroslav@1890: int c = ctl.get(); jaroslav@1890: if (runStateLessThan(c, STOP)) { jaroslav@1890: if (!completedAbruptly) { jaroslav@1890: int min = allowCoreThreadTimeOut ? 0 : corePoolSize; jaroslav@1890: if (min == 0 && ! workQueue.isEmpty()) jaroslav@1890: min = 1; jaroslav@1890: if (workerCountOf(c) >= min) jaroslav@1890: return; // replacement not needed jaroslav@1890: } jaroslav@1890: addWorker(null, false); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Performs blocking or timed wait for a task, depending on jaroslav@1890: * current configuration settings, or returns null if this worker jaroslav@1890: * must exit because of any of: jaroslav@1890: * 1. There are more than maximumPoolSize workers (due to jaroslav@1890: * a call to setMaximumPoolSize). jaroslav@1890: * 2. The pool is stopped. jaroslav@1890: * 3. The pool is shutdown and the queue is empty. jaroslav@1890: * 4. This worker timed out waiting for a task, and timed-out jaroslav@1890: * workers are subject to termination (that is, jaroslav@1890: * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) jaroslav@1890: * both before and after the timed wait. jaroslav@1890: * jaroslav@1890: * @return task, or null if the worker must exit, in which case jaroslav@1890: * workerCount is decremented jaroslav@1890: */ jaroslav@1890: private Runnable getTask() { jaroslav@1890: boolean timedOut = false; // Did the last poll() time out? jaroslav@1890: jaroslav@1890: retry: jaroslav@1890: for (;;) { jaroslav@1890: int c = ctl.get(); jaroslav@1890: int rs = runStateOf(c); jaroslav@1890: jaroslav@1890: // Check if queue empty only if necessary. jaroslav@1890: if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { jaroslav@1890: decrementWorkerCount(); jaroslav@1890: return null; jaroslav@1890: } jaroslav@1890: jaroslav@1890: boolean timed; // Are workers subject to culling? jaroslav@1890: jaroslav@1890: for (;;) { jaroslav@1890: int wc = workerCountOf(c); jaroslav@1890: timed = allowCoreThreadTimeOut || wc > corePoolSize; jaroslav@1890: jaroslav@1890: if (wc <= maximumPoolSize && ! (timedOut && timed)) jaroslav@1890: break; jaroslav@1890: if (compareAndDecrementWorkerCount(c)) jaroslav@1890: return null; jaroslav@1890: c = ctl.get(); // Re-read ctl jaroslav@1890: if (runStateOf(c) != rs) jaroslav@1890: continue retry; jaroslav@1890: // else CAS failed due to workerCount change; retry inner loop jaroslav@1890: } jaroslav@1890: jaroslav@1890: try { jaroslav@1890: Runnable r = timed ? jaroslav@1890: workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : jaroslav@1890: workQueue.take(); jaroslav@1890: if (r != null) jaroslav@1890: return r; jaroslav@1890: timedOut = true; jaroslav@1890: } catch (InterruptedException retry) { jaroslav@1890: timedOut = false; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Main worker run loop. Repeatedly gets tasks from queue and jaroslav@1890: * executes them, while coping with a number of issues: jaroslav@1890: * jaroslav@1890: * 1. We may start out with an initial task, in which case we jaroslav@1890: * don't need to get the first one. Otherwise, as long as pool is jaroslav@1890: * running, we get tasks from getTask. If it returns null then the jaroslav@1890: * worker exits due to changed pool state or configuration jaroslav@1890: * parameters. Other exits result from exception throws in jaroslav@1890: * external code, in which case completedAbruptly holds, which jaroslav@1890: * usually leads processWorkerExit to replace this thread. jaroslav@1890: * jaroslav@1890: * 2. Before running any task, the lock is acquired to prevent jaroslav@1890: * other pool interrupts while the task is executing, and jaroslav@1890: * clearInterruptsForTaskRun called to ensure that unless pool is jaroslav@1890: * stopping, this thread does not have its interrupt set. jaroslav@1890: * jaroslav@1890: * 3. Each task run is preceded by a call to beforeExecute, which jaroslav@1890: * might throw an exception, in which case we cause thread to die jaroslav@1890: * (breaking loop with completedAbruptly true) without processing jaroslav@1890: * the task. jaroslav@1890: * jaroslav@1890: * 4. Assuming beforeExecute completes normally, we run the task, jaroslav@1890: * gathering any of its thrown exceptions to send to jaroslav@1890: * afterExecute. We separately handle RuntimeException, Error jaroslav@1890: * (both of which the specs guarantee that we trap) and arbitrary jaroslav@1890: * Throwables. Because we cannot rethrow Throwables within jaroslav@1890: * Runnable.run, we wrap them within Errors on the way out (to the jaroslav@1890: * thread's UncaughtExceptionHandler). Any thrown exception also jaroslav@1890: * conservatively causes thread to die. jaroslav@1890: * jaroslav@1890: * 5. After task.run completes, we call afterExecute, which may jaroslav@1890: * also throw an exception, which will also cause thread to jaroslav@1890: * die. According to JLS Sec 14.20, this exception is the one that jaroslav@1890: * will be in effect even if task.run throws. jaroslav@1890: * jaroslav@1890: * The net effect of the exception mechanics is that afterExecute jaroslav@1890: * and the thread's UncaughtExceptionHandler have as accurate jaroslav@1890: * information as we can provide about any problems encountered by jaroslav@1890: * user code. jaroslav@1890: * jaroslav@1890: * @param w the worker jaroslav@1890: */ jaroslav@1890: final void runWorker(Worker w) { jaroslav@1890: Runnable task = w.firstTask; jaroslav@1890: w.firstTask = null; jaroslav@1890: boolean completedAbruptly = true; jaroslav@1890: try { jaroslav@1890: while (task != null || (task = getTask()) != null) { jaroslav@1890: w.lock(); jaroslav@1890: clearInterruptsForTaskRun(); jaroslav@1890: try { jaroslav@1890: beforeExecute(w.thread, task); jaroslav@1890: Throwable thrown = null; jaroslav@1890: try { jaroslav@1890: task.run(); jaroslav@1890: } catch (RuntimeException x) { jaroslav@1890: thrown = x; throw x; jaroslav@1890: } catch (Error x) { jaroslav@1890: thrown = x; throw x; jaroslav@1890: } catch (Throwable x) { jaroslav@1890: thrown = x; throw new Error(x); jaroslav@1890: } finally { jaroslav@1890: afterExecute(task, thrown); jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: task = null; jaroslav@1890: w.completedTasks++; jaroslav@1890: w.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: completedAbruptly = false; jaroslav@1890: } finally { jaroslav@1890: processWorkerExit(w, completedAbruptly); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: // Public constructors and methods jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new {@code ThreadPoolExecutor} with the given initial jaroslav@1890: * parameters and default thread factory and rejected execution handler. jaroslav@1890: * It may be more convenient to use one of the {@link Executors} factory jaroslav@1890: * methods instead of this general purpose constructor. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param maximumPoolSize the maximum number of threads to allow in the jaroslav@1890: * pool jaroslav@1890: * @param keepAliveTime when the number of threads is greater than jaroslav@1890: * the core, this is the maximum time that excess idle threads jaroslav@1890: * will wait for new tasks before terminating. jaroslav@1890: * @param unit the time unit for the {@code keepAliveTime} argument jaroslav@1890: * @param workQueue the queue to use for holding tasks before they are jaroslav@1890: * executed. This queue will hold only the {@code Runnable} jaroslav@1890: * tasks submitted by the {@code execute} method. jaroslav@1890: * @throws IllegalArgumentException if one of the following holds:
jaroslav@1890: * {@code corePoolSize < 0}
jaroslav@1890: * {@code keepAliveTime < 0}
jaroslav@1890: * {@code maximumPoolSize <= 0}
jaroslav@1890: * {@code maximumPoolSize < corePoolSize} jaroslav@1890: * @throws NullPointerException if {@code workQueue} is null jaroslav@1890: */ jaroslav@1890: public ThreadPoolExecutor(int corePoolSize, jaroslav@1890: int maximumPoolSize, jaroslav@1890: long keepAliveTime, jaroslav@1890: TimeUnit unit, jaroslav@1890: BlockingQueue workQueue) { jaroslav@1890: this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, jaroslav@1890: Executors.defaultThreadFactory(), defaultHandler); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new {@code ThreadPoolExecutor} with the given initial jaroslav@1890: * parameters and default rejected execution handler. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param maximumPoolSize the maximum number of threads to allow in the jaroslav@1890: * pool jaroslav@1890: * @param keepAliveTime when the number of threads is greater than jaroslav@1890: * the core, this is the maximum time that excess idle threads jaroslav@1890: * will wait for new tasks before terminating. jaroslav@1890: * @param unit the time unit for the {@code keepAliveTime} argument jaroslav@1890: * @param workQueue the queue to use for holding tasks before they are jaroslav@1890: * executed. This queue will hold only the {@code Runnable} jaroslav@1890: * tasks submitted by the {@code execute} method. jaroslav@1890: * @param threadFactory the factory to use when the executor jaroslav@1890: * creates a new thread jaroslav@1890: * @throws IllegalArgumentException if one of the following holds:
jaroslav@1890: * {@code corePoolSize < 0}
jaroslav@1890: * {@code keepAliveTime < 0}
jaroslav@1890: * {@code maximumPoolSize <= 0}
jaroslav@1890: * {@code maximumPoolSize < corePoolSize} jaroslav@1890: * @throws NullPointerException if {@code workQueue} jaroslav@1890: * or {@code threadFactory} is null jaroslav@1890: */ jaroslav@1890: public ThreadPoolExecutor(int corePoolSize, jaroslav@1890: int maximumPoolSize, jaroslav@1890: long keepAliveTime, jaroslav@1890: TimeUnit unit, jaroslav@1890: BlockingQueue workQueue, jaroslav@1890: ThreadFactory threadFactory) { jaroslav@1890: this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, jaroslav@1890: threadFactory, defaultHandler); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new {@code ThreadPoolExecutor} with the given initial jaroslav@1890: * parameters and default thread factory. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param maximumPoolSize the maximum number of threads to allow in the jaroslav@1890: * pool jaroslav@1890: * @param keepAliveTime when the number of threads is greater than jaroslav@1890: * the core, this is the maximum time that excess idle threads jaroslav@1890: * will wait for new tasks before terminating. jaroslav@1890: * @param unit the time unit for the {@code keepAliveTime} argument jaroslav@1890: * @param workQueue the queue to use for holding tasks before they are jaroslav@1890: * executed. This queue will hold only the {@code Runnable} jaroslav@1890: * tasks submitted by the {@code execute} method. jaroslav@1890: * @param handler the handler to use when execution is blocked jaroslav@1890: * because the thread bounds and queue capacities are reached jaroslav@1890: * @throws IllegalArgumentException if one of the following holds:
jaroslav@1890: * {@code corePoolSize < 0}
jaroslav@1890: * {@code keepAliveTime < 0}
jaroslav@1890: * {@code maximumPoolSize <= 0}
jaroslav@1890: * {@code maximumPoolSize < corePoolSize} jaroslav@1890: * @throws NullPointerException if {@code workQueue} jaroslav@1890: * or {@code handler} is null jaroslav@1890: */ jaroslav@1890: public ThreadPoolExecutor(int corePoolSize, jaroslav@1890: int maximumPoolSize, jaroslav@1890: long keepAliveTime, jaroslav@1890: TimeUnit unit, jaroslav@1890: BlockingQueue workQueue, jaroslav@1890: RejectedExecutionHandler handler) { jaroslav@1890: this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, jaroslav@1890: Executors.defaultThreadFactory(), handler); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new {@code ThreadPoolExecutor} with the given initial jaroslav@1890: * parameters. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param maximumPoolSize the maximum number of threads to allow in the jaroslav@1890: * pool jaroslav@1890: * @param keepAliveTime when the number of threads is greater than jaroslav@1890: * the core, this is the maximum time that excess idle threads jaroslav@1890: * will wait for new tasks before terminating. jaroslav@1890: * @param unit the time unit for the {@code keepAliveTime} argument jaroslav@1890: * @param workQueue the queue to use for holding tasks before they are jaroslav@1890: * executed. This queue will hold only the {@code Runnable} jaroslav@1890: * tasks submitted by the {@code execute} method. jaroslav@1890: * @param threadFactory the factory to use when the executor jaroslav@1890: * creates a new thread jaroslav@1890: * @param handler the handler to use when execution is blocked jaroslav@1890: * because the thread bounds and queue capacities are reached jaroslav@1890: * @throws IllegalArgumentException if one of the following holds:
jaroslav@1890: * {@code corePoolSize < 0}
jaroslav@1890: * {@code keepAliveTime < 0}
jaroslav@1890: * {@code maximumPoolSize <= 0}
jaroslav@1890: * {@code maximumPoolSize < corePoolSize} jaroslav@1890: * @throws NullPointerException if {@code workQueue} jaroslav@1890: * or {@code threadFactory} or {@code handler} is null jaroslav@1890: */ jaroslav@1890: public ThreadPoolExecutor(int corePoolSize, jaroslav@1890: int maximumPoolSize, jaroslav@1890: long keepAliveTime, jaroslav@1890: TimeUnit unit, jaroslav@1890: BlockingQueue workQueue, jaroslav@1890: ThreadFactory threadFactory, jaroslav@1890: RejectedExecutionHandler handler) { jaroslav@1890: if (corePoolSize < 0 || jaroslav@1890: maximumPoolSize <= 0 || jaroslav@1890: maximumPoolSize < corePoolSize || jaroslav@1890: keepAliveTime < 0) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: if (workQueue == null || threadFactory == null || handler == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: this.corePoolSize = corePoolSize; jaroslav@1890: this.maximumPoolSize = maximumPoolSize; jaroslav@1890: this.workQueue = workQueue; jaroslav@1890: this.keepAliveTime = unit.toNanos(keepAliveTime); jaroslav@1890: this.threadFactory = threadFactory; jaroslav@1890: this.handler = handler; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Executes the given task sometime in the future. The task jaroslav@1890: * may execute in a new thread or in an existing pooled thread. jaroslav@1890: * jaroslav@1890: * If the task cannot be submitted for execution, either because this jaroslav@1890: * executor has been shutdown or because its capacity has been reached, jaroslav@1890: * the task is handled by the current {@code RejectedExecutionHandler}. jaroslav@1890: * jaroslav@1890: * @param command the task to execute jaroslav@1890: * @throws RejectedExecutionException at discretion of jaroslav@1890: * {@code RejectedExecutionHandler}, if the task jaroslav@1890: * cannot be accepted for execution jaroslav@1890: * @throws NullPointerException if {@code command} is null jaroslav@1890: */ jaroslav@1890: public void execute(Runnable command) { jaroslav@1890: if (command == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: /* jaroslav@1890: * Proceed in 3 steps: jaroslav@1890: * jaroslav@1890: * 1. If fewer than corePoolSize threads are running, try to jaroslav@1890: * start a new thread with the given command as its first jaroslav@1890: * task. The call to addWorker atomically checks runState and jaroslav@1890: * workerCount, and so prevents false alarms that would add jaroslav@1890: * threads when it shouldn't, by returning false. jaroslav@1890: * jaroslav@1890: * 2. If a task can be successfully queued, then we still need jaroslav@1890: * to double-check whether we should have added a thread jaroslav@1890: * (because existing ones died since last checking) or that jaroslav@1890: * the pool shut down since entry into this method. So we jaroslav@1890: * recheck state and if necessary roll back the enqueuing if jaroslav@1890: * stopped, or start a new thread if there are none. jaroslav@1890: * jaroslav@1890: * 3. If we cannot queue task, then we try to add a new jaroslav@1890: * thread. If it fails, we know we are shut down or saturated jaroslav@1890: * and so reject the task. jaroslav@1890: */ jaroslav@1890: int c = ctl.get(); jaroslav@1890: if (workerCountOf(c) < corePoolSize) { jaroslav@1890: if (addWorker(command, true)) jaroslav@1890: return; jaroslav@1890: c = ctl.get(); jaroslav@1890: } jaroslav@1890: if (isRunning(c) && workQueue.offer(command)) { jaroslav@1890: int recheck = ctl.get(); jaroslav@1890: if (! isRunning(recheck) && remove(command)) jaroslav@1890: reject(command); jaroslav@1890: else if (workerCountOf(recheck) == 0) jaroslav@1890: addWorker(null, false); jaroslav@1890: } jaroslav@1890: else if (!addWorker(command, false)) jaroslav@1890: reject(command); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Initiates an orderly shutdown in which previously submitted jaroslav@1890: * tasks are executed, but no new tasks will be accepted. jaroslav@1890: * Invocation has no additional effect if already shut down. jaroslav@1890: * jaroslav@1890: *

This method does not wait for previously submitted tasks to jaroslav@1890: * complete execution. Use {@link #awaitTermination awaitTermination} jaroslav@1890: * to do that. jaroslav@1890: * jaroslav@1890: * @throws SecurityException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public void shutdown() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: checkShutdownAccess(); jaroslav@1890: advanceRunState(SHUTDOWN); jaroslav@1890: interruptIdleWorkers(); jaroslav@1890: onShutdown(); // hook for ScheduledThreadPoolExecutor jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: tryTerminate(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempts to stop all actively executing tasks, halts the jaroslav@1890: * processing of waiting tasks, and returns a list of the tasks jaroslav@1890: * that were awaiting execution. These tasks are drained (removed) jaroslav@1890: * from the task queue upon return from this method. jaroslav@1890: * jaroslav@1890: *

This method does not wait for actively executing tasks to jaroslav@1890: * terminate. Use {@link #awaitTermination awaitTermination} to jaroslav@1890: * do that. jaroslav@1890: * jaroslav@1890: *

There are no guarantees beyond best-effort attempts to stop jaroslav@1890: * processing actively executing tasks. This implementation jaroslav@1890: * cancels tasks via {@link Thread#interrupt}, so any task that jaroslav@1890: * fails to respond to interrupts may never terminate. jaroslav@1890: * jaroslav@1890: * @throws SecurityException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public List shutdownNow() { jaroslav@1890: List tasks; jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: checkShutdownAccess(); jaroslav@1890: advanceRunState(STOP); jaroslav@1890: interruptWorkers(); jaroslav@1890: tasks = drainQueue(); jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: tryTerminate(); jaroslav@1890: return tasks; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean isShutdown() { jaroslav@1890: return ! isRunning(ctl.get()); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns true if this executor is in the process of terminating jaroslav@1890: * after {@link #shutdown} or {@link #shutdownNow} but has not jaroslav@1890: * completely terminated. This method may be useful for jaroslav@1890: * debugging. A return of {@code true} reported a sufficient jaroslav@1890: * period after shutdown may indicate that submitted tasks have jaroslav@1890: * ignored or suppressed interruption, causing this executor not jaroslav@1890: * to properly terminate. jaroslav@1890: * jaroslav@1890: * @return true if terminating but not yet terminated jaroslav@1890: */ jaroslav@1890: public boolean isTerminating() { jaroslav@1890: int c = ctl.get(); jaroslav@1890: return ! isRunning(c) && runStateLessThan(c, TERMINATED); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean isTerminated() { jaroslav@1890: return runStateAtLeast(ctl.get(), TERMINATED); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean awaitTermination(long timeout, TimeUnit unit) jaroslav@1890: throws InterruptedException { jaroslav@1890: long nanos = unit.toNanos(timeout); jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: for (;;) { jaroslav@1890: if (runStateAtLeast(ctl.get(), TERMINATED)) jaroslav@1890: return true; jaroslav@1890: if (nanos <= 0) jaroslav@1890: return false; jaroslav@1890: nanos = termination.awaitNanos(nanos); jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Invokes {@code shutdown} when this executor is no longer jaroslav@1890: * referenced and it has no threads. jaroslav@1890: */ jaroslav@1890: protected void finalize() { jaroslav@1890: shutdown(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the thread factory used to create new threads. jaroslav@1890: * jaroslav@1890: * @param threadFactory the new thread factory jaroslav@1890: * @throws NullPointerException if threadFactory is null jaroslav@1890: * @see #getThreadFactory jaroslav@1890: */ jaroslav@1890: public void setThreadFactory(ThreadFactory threadFactory) { jaroslav@1890: if (threadFactory == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: this.threadFactory = threadFactory; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the thread factory used to create new threads. jaroslav@1890: * jaroslav@1890: * @return the current thread factory jaroslav@1890: * @see #setThreadFactory jaroslav@1890: */ jaroslav@1890: public ThreadFactory getThreadFactory() { jaroslav@1890: return threadFactory; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets a new handler for unexecutable tasks. jaroslav@1890: * jaroslav@1890: * @param handler the new handler jaroslav@1890: * @throws NullPointerException if handler is null jaroslav@1890: * @see #getRejectedExecutionHandler jaroslav@1890: */ jaroslav@1890: public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { jaroslav@1890: if (handler == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: this.handler = handler; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the current handler for unexecutable tasks. jaroslav@1890: * jaroslav@1890: * @return the current handler jaroslav@1890: * @see #setRejectedExecutionHandler jaroslav@1890: */ jaroslav@1890: public RejectedExecutionHandler getRejectedExecutionHandler() { jaroslav@1890: return handler; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the core number of threads. This overrides any value set jaroslav@1890: * in the constructor. If the new value is smaller than the jaroslav@1890: * current value, excess existing threads will be terminated when jaroslav@1890: * they next become idle. If larger, new threads will, if needed, jaroslav@1890: * be started to execute any queued tasks. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the new core size jaroslav@1890: * @throws IllegalArgumentException if {@code corePoolSize < 0} jaroslav@1890: * @see #getCorePoolSize jaroslav@1890: */ jaroslav@1890: public void setCorePoolSize(int corePoolSize) { jaroslav@1890: if (corePoolSize < 0) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: int delta = corePoolSize - this.corePoolSize; jaroslav@1890: this.corePoolSize = corePoolSize; jaroslav@1890: if (workerCountOf(ctl.get()) > corePoolSize) jaroslav@1890: interruptIdleWorkers(); jaroslav@1890: else if (delta > 0) { jaroslav@1890: // We don't really know how many new threads are "needed". jaroslav@1890: // As a heuristic, prestart enough new workers (up to new jaroslav@1890: // core size) to handle the current number of tasks in jaroslav@1890: // queue, but stop if queue becomes empty while doing so. jaroslav@1890: int k = Math.min(delta, workQueue.size()); jaroslav@1890: while (k-- > 0 && addWorker(null, true)) { jaroslav@1890: if (workQueue.isEmpty()) jaroslav@1890: break; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the core number of threads. jaroslav@1890: * jaroslav@1890: * @return the core number of threads jaroslav@1890: * @see #setCorePoolSize jaroslav@1890: */ jaroslav@1890: public int getCorePoolSize() { jaroslav@1890: return corePoolSize; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Starts a core thread, causing it to idly wait for work. This jaroslav@1890: * overrides the default policy of starting core threads only when jaroslav@1890: * new tasks are executed. This method will return {@code false} jaroslav@1890: * if all core threads have already been started. jaroslav@1890: * jaroslav@1890: * @return {@code true} if a thread was started jaroslav@1890: */ jaroslav@1890: public boolean prestartCoreThread() { jaroslav@1890: return workerCountOf(ctl.get()) < corePoolSize && jaroslav@1890: addWorker(null, true); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Starts all core threads, causing them to idly wait for work. This jaroslav@1890: * overrides the default policy of starting core threads only when jaroslav@1890: * new tasks are executed. jaroslav@1890: * jaroslav@1890: * @return the number of threads started jaroslav@1890: */ jaroslav@1890: public int prestartAllCoreThreads() { jaroslav@1890: int n = 0; jaroslav@1890: while (addWorker(null, true)) jaroslav@1890: ++n; jaroslav@1890: return n; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns true if this pool allows core threads to time out and jaroslav@1890: * terminate if no tasks arrive within the keepAlive time, being jaroslav@1890: * replaced if needed when new tasks arrive. When true, the same jaroslav@1890: * keep-alive policy applying to non-core threads applies also to jaroslav@1890: * core threads. When false (the default), core threads are never jaroslav@1890: * terminated due to lack of incoming tasks. jaroslav@1890: * jaroslav@1890: * @return {@code true} if core threads are allowed to time out, jaroslav@1890: * else {@code false} jaroslav@1890: * jaroslav@1890: * @since 1.6 jaroslav@1890: */ jaroslav@1890: public boolean allowsCoreThreadTimeOut() { jaroslav@1890: return allowCoreThreadTimeOut; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the policy governing whether core threads may time out and jaroslav@1890: * terminate if no tasks arrive within the keep-alive time, being jaroslav@1890: * replaced if needed when new tasks arrive. When false, core jaroslav@1890: * threads are never terminated due to lack of incoming jaroslav@1890: * tasks. When true, the same keep-alive policy applying to jaroslav@1890: * non-core threads applies also to core threads. To avoid jaroslav@1890: * continual thread replacement, the keep-alive time must be jaroslav@1890: * greater than zero when setting {@code true}. This method jaroslav@1890: * should in general be called before the pool is actively used. jaroslav@1890: * jaroslav@1890: * @param value {@code true} if should time out, else {@code false} jaroslav@1890: * @throws IllegalArgumentException if value is {@code true} jaroslav@1890: * and the current keep-alive time is not greater than zero jaroslav@1890: * jaroslav@1890: * @since 1.6 jaroslav@1890: */ jaroslav@1890: public void allowCoreThreadTimeOut(boolean value) { jaroslav@1890: if (value && keepAliveTime <= 0) jaroslav@1890: throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); jaroslav@1890: if (value != allowCoreThreadTimeOut) { jaroslav@1890: allowCoreThreadTimeOut = value; jaroslav@1890: if (value) jaroslav@1890: interruptIdleWorkers(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the maximum allowed number of threads. This overrides any jaroslav@1890: * value set in the constructor. If the new value is smaller than jaroslav@1890: * the current value, excess existing threads will be jaroslav@1890: * terminated when they next become idle. jaroslav@1890: * jaroslav@1890: * @param maximumPoolSize the new maximum jaroslav@1890: * @throws IllegalArgumentException if the new maximum is jaroslav@1890: * less than or equal to zero, or jaroslav@1890: * less than the {@linkplain #getCorePoolSize core pool size} jaroslav@1890: * @see #getMaximumPoolSize jaroslav@1890: */ jaroslav@1890: public void setMaximumPoolSize(int maximumPoolSize) { jaroslav@1890: if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: this.maximumPoolSize = maximumPoolSize; jaroslav@1890: if (workerCountOf(ctl.get()) > maximumPoolSize) jaroslav@1890: interruptIdleWorkers(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the maximum allowed number of threads. jaroslav@1890: * jaroslav@1890: * @return the maximum allowed number of threads jaroslav@1890: * @see #setMaximumPoolSize jaroslav@1890: */ jaroslav@1890: public int getMaximumPoolSize() { jaroslav@1890: return maximumPoolSize; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the time limit for which threads may remain idle before jaroslav@1890: * being terminated. If there are more than the core number of jaroslav@1890: * threads currently in the pool, after waiting this amount of jaroslav@1890: * time without processing a task, excess threads will be jaroslav@1890: * terminated. This overrides any value set in the constructor. jaroslav@1890: * jaroslav@1890: * @param time the time to wait. A time value of zero will cause jaroslav@1890: * excess threads to terminate immediately after executing tasks. jaroslav@1890: * @param unit the time unit of the {@code time} argument jaroslav@1890: * @throws IllegalArgumentException if {@code time} less than zero or jaroslav@1890: * if {@code time} is zero and {@code allowsCoreThreadTimeOut} jaroslav@1890: * @see #getKeepAliveTime jaroslav@1890: */ jaroslav@1890: public void setKeepAliveTime(long time, TimeUnit unit) { jaroslav@1890: if (time < 0) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: if (time == 0 && allowsCoreThreadTimeOut()) jaroslav@1890: throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); jaroslav@1890: long keepAliveTime = unit.toNanos(time); jaroslav@1890: long delta = keepAliveTime - this.keepAliveTime; jaroslav@1890: this.keepAliveTime = keepAliveTime; jaroslav@1890: if (delta < 0) jaroslav@1890: interruptIdleWorkers(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the thread keep-alive time, which is the amount of time jaroslav@1890: * that threads in excess of the core pool size may remain jaroslav@1890: * idle before being terminated. jaroslav@1890: * jaroslav@1890: * @param unit the desired time unit of the result jaroslav@1890: * @return the time limit jaroslav@1890: * @see #setKeepAliveTime jaroslav@1890: */ jaroslav@1890: public long getKeepAliveTime(TimeUnit unit) { jaroslav@1890: return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* User-level queue utilities */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the task queue used by this executor. Access to the jaroslav@1890: * task queue is intended primarily for debugging and monitoring. jaroslav@1890: * This queue may be in active use. Retrieving the task queue jaroslav@1890: * does not prevent queued tasks from executing. jaroslav@1890: * jaroslav@1890: * @return the task queue jaroslav@1890: */ jaroslav@1890: public BlockingQueue getQueue() { jaroslav@1890: return workQueue; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Removes this task from the executor's internal queue if it is jaroslav@1890: * present, thus causing it not to be run if it has not already jaroslav@1890: * started. jaroslav@1890: * jaroslav@1890: *

This method may be useful as one part of a cancellation jaroslav@1890: * scheme. It may fail to remove tasks that have been converted jaroslav@1890: * into other forms before being placed on the internal queue. For jaroslav@1890: * example, a task entered using {@code submit} might be jaroslav@1890: * converted into a form that maintains {@code Future} status. jaroslav@1890: * However, in such cases, method {@link #purge} may be used to jaroslav@1890: * remove those Futures that have been cancelled. jaroslav@1890: * jaroslav@1890: * @param task the task to remove jaroslav@1890: * @return true if the task was removed jaroslav@1890: */ jaroslav@1890: public boolean remove(Runnable task) { jaroslav@1890: boolean removed = workQueue.remove(task); jaroslav@1890: tryTerminate(); // In case SHUTDOWN and now empty jaroslav@1890: return removed; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Tries to remove from the work queue all {@link Future} jaroslav@1890: * tasks that have been cancelled. This method can be useful as a jaroslav@1890: * storage reclamation operation, that has no other impact on jaroslav@1890: * functionality. Cancelled tasks are never executed, but may jaroslav@1890: * accumulate in work queues until worker threads can actively jaroslav@1890: * remove them. Invoking this method instead tries to remove them now. jaroslav@1890: * However, this method may fail to remove tasks in jaroslav@1890: * the presence of interference by other threads. jaroslav@1890: */ jaroslav@1890: public void purge() { jaroslav@1890: final BlockingQueue q = workQueue; jaroslav@1890: try { jaroslav@1890: Iterator it = q.iterator(); jaroslav@1890: while (it.hasNext()) { jaroslav@1890: Runnable r = it.next(); jaroslav@1890: if (r instanceof Future && ((Future)r).isCancelled()) jaroslav@1890: it.remove(); jaroslav@1890: } jaroslav@1890: } catch (ConcurrentModificationException fallThrough) { jaroslav@1890: // Take slow path if we encounter interference during traversal. jaroslav@1890: // Make copy for traversal and call remove for cancelled entries. jaroslav@1890: // The slow path is more likely to be O(N*N). jaroslav@1890: for (Object r : q.toArray()) jaroslav@1890: if (r instanceof Future && ((Future)r).isCancelled()) jaroslav@1890: q.remove(r); jaroslav@1890: } jaroslav@1890: jaroslav@1890: tryTerminate(); // In case SHUTDOWN and now empty jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* Statistics */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the current number of threads in the pool. jaroslav@1890: * jaroslav@1890: * @return the number of threads jaroslav@1890: */ jaroslav@1890: public int getPoolSize() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: // Remove rare and surprising possibility of jaroslav@1890: // isTerminated() && getPoolSize() > 0 jaroslav@1890: return runStateAtLeast(ctl.get(), TIDYING) ? 0 jaroslav@1890: : workers.size(); jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the approximate number of threads that are actively jaroslav@1890: * executing tasks. jaroslav@1890: * jaroslav@1890: * @return the number of threads jaroslav@1890: */ jaroslav@1890: public int getActiveCount() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: int n = 0; jaroslav@1890: for (Worker w : workers) jaroslav@1890: if (w.isLocked()) jaroslav@1890: ++n; jaroslav@1890: return n; jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the largest number of threads that have ever jaroslav@1890: * simultaneously been in the pool. jaroslav@1890: * jaroslav@1890: * @return the number of threads jaroslav@1890: */ jaroslav@1890: public int getLargestPoolSize() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: return largestPoolSize; jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the approximate total number of tasks that have ever been jaroslav@1890: * scheduled for execution. Because the states of tasks and jaroslav@1890: * threads may change dynamically during computation, the returned jaroslav@1890: * value is only an approximation. jaroslav@1890: * jaroslav@1890: * @return the number of tasks jaroslav@1890: */ jaroslav@1890: public long getTaskCount() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: long n = completedTaskCount; jaroslav@1890: for (Worker w : workers) { jaroslav@1890: n += w.completedTasks; jaroslav@1890: if (w.isLocked()) jaroslav@1890: ++n; jaroslav@1890: } jaroslav@1890: return n + workQueue.size(); jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the approximate total number of tasks that have jaroslav@1890: * completed execution. Because the states of tasks and threads jaroslav@1890: * may change dynamically during computation, the returned value jaroslav@1890: * is only an approximation, but one that does not ever decrease jaroslav@1890: * across successive calls. jaroslav@1890: * jaroslav@1890: * @return the number of tasks jaroslav@1890: */ jaroslav@1890: public long getCompletedTaskCount() { jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: long n = completedTaskCount; jaroslav@1890: for (Worker w : workers) jaroslav@1890: n += w.completedTasks; jaroslav@1890: return n; jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns a string identifying this pool, as well as its state, jaroslav@1890: * including indications of run state and estimated worker and jaroslav@1890: * task counts. jaroslav@1890: * jaroslav@1890: * @return a string identifying this pool, as well as its state jaroslav@1890: */ jaroslav@1890: public String toString() { jaroslav@1890: long ncompleted; jaroslav@1890: int nworkers, nactive; jaroslav@1890: final ReentrantLock mainLock = this.mainLock; jaroslav@1890: mainLock.lock(); jaroslav@1890: try { jaroslav@1890: ncompleted = completedTaskCount; jaroslav@1890: nactive = 0; jaroslav@1890: nworkers = workers.size(); jaroslav@1890: for (Worker w : workers) { jaroslav@1890: ncompleted += w.completedTasks; jaroslav@1890: if (w.isLocked()) jaroslav@1890: ++nactive; jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: mainLock.unlock(); jaroslav@1890: } jaroslav@1890: int c = ctl.get(); jaroslav@1890: String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : jaroslav@1890: (runStateAtLeast(c, TERMINATED) ? "Terminated" : jaroslav@1890: "Shutting down")); jaroslav@1890: return super.toString() + jaroslav@1890: "[" + rs + jaroslav@1890: ", pool size = " + nworkers + jaroslav@1890: ", active threads = " + nactive + jaroslav@1890: ", queued tasks = " + workQueue.size() + jaroslav@1890: ", completed tasks = " + ncompleted + jaroslav@1890: "]"; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /* Extension hooks */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Method invoked prior to executing the given Runnable in the jaroslav@1890: * given thread. This method is invoked by thread {@code t} that jaroslav@1890: * will execute task {@code r}, and may be used to re-initialize jaroslav@1890: * ThreadLocals, or to perform logging. jaroslav@1890: * jaroslav@1890: *

This implementation does nothing, but may be customized in jaroslav@1890: * subclasses. Note: To properly nest multiple overridings, subclasses jaroslav@1890: * should generally invoke {@code super.beforeExecute} at the end of jaroslav@1890: * this method. jaroslav@1890: * jaroslav@1890: * @param t the thread that will run task {@code r} jaroslav@1890: * @param r the task that will be executed jaroslav@1890: */ jaroslav@1890: protected void beforeExecute(Thread t, Runnable r) { } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Method invoked upon completion of execution of the given Runnable. jaroslav@1890: * This method is invoked by the thread that executed the task. If jaroslav@1890: * non-null, the Throwable is the uncaught {@code RuntimeException} jaroslav@1890: * or {@code Error} that caused execution to terminate abruptly. jaroslav@1890: * jaroslav@1890: *

This implementation does nothing, but may be customized in jaroslav@1890: * subclasses. Note: To properly nest multiple overridings, subclasses jaroslav@1890: * should generally invoke {@code super.afterExecute} at the jaroslav@1890: * beginning of this method. jaroslav@1890: * jaroslav@1890: *

Note: When actions are enclosed in tasks (such as jaroslav@1890: * {@link FutureTask}) either explicitly or via methods such as jaroslav@1890: * {@code submit}, these task objects catch and maintain jaroslav@1890: * computational exceptions, and so they do not cause abrupt jaroslav@1890: * termination, and the internal exceptions are not jaroslav@1890: * passed to this method. If you would like to trap both kinds of jaroslav@1890: * failures in this method, you can further probe for such cases, jaroslav@1890: * as in this sample subclass that prints either the direct cause jaroslav@1890: * or the underlying exception if a task has been aborted: jaroslav@1890: * jaroslav@1890: *

 {@code
jaroslav@1890:      * class ExtendedExecutor extends ThreadPoolExecutor {
jaroslav@1890:      *   // ...
jaroslav@1890:      *   protected void afterExecute(Runnable r, Throwable t) {
jaroslav@1890:      *     super.afterExecute(r, t);
jaroslav@1890:      *     if (t == null && r instanceof Future) {
jaroslav@1890:      *       try {
jaroslav@1890:      *         Object result = ((Future) r).get();
jaroslav@1890:      *       } catch (CancellationException ce) {
jaroslav@1890:      *           t = ce;
jaroslav@1890:      *       } catch (ExecutionException ee) {
jaroslav@1890:      *           t = ee.getCause();
jaroslav@1890:      *       } catch (InterruptedException ie) {
jaroslav@1890:      *           Thread.currentThread().interrupt(); // ignore/reset
jaroslav@1890:      *       }
jaroslav@1890:      *     }
jaroslav@1890:      *     if (t != null)
jaroslav@1890:      *       System.out.println(t);
jaroslav@1890:      *   }
jaroslav@1890:      * }}
jaroslav@1890: * jaroslav@1890: * @param r the runnable that has completed jaroslav@1890: * @param t the exception that caused termination, or null if jaroslav@1890: * execution completed normally jaroslav@1890: */ jaroslav@1890: protected void afterExecute(Runnable r, Throwable t) { } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Method invoked when the Executor has terminated. Default jaroslav@1890: * implementation does nothing. Note: To properly nest multiple jaroslav@1890: * overridings, subclasses should generally invoke jaroslav@1890: * {@code super.terminated} within this method. jaroslav@1890: */ jaroslav@1890: protected void terminated() { } jaroslav@1890: jaroslav@1890: /* Predefined RejectedExecutionHandlers */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * A handler for rejected tasks that runs the rejected task jaroslav@1890: * directly in the calling thread of the {@code execute} method, jaroslav@1890: * unless the executor has been shut down, in which case the task jaroslav@1890: * is discarded. jaroslav@1890: */ jaroslav@1890: public static class CallerRunsPolicy implements RejectedExecutionHandler { jaroslav@1890: /** jaroslav@1890: * Creates a {@code CallerRunsPolicy}. jaroslav@1890: */ jaroslav@1890: public CallerRunsPolicy() { } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Executes task r in the caller's thread, unless the executor jaroslav@1890: * has been shut down, in which case the task is discarded. jaroslav@1890: * jaroslav@1890: * @param r the runnable task requested to be executed jaroslav@1890: * @param e the executor attempting to execute this task jaroslav@1890: */ jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { jaroslav@1890: if (!e.isShutdown()) { jaroslav@1890: r.run(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * A handler for rejected tasks that throws a jaroslav@1890: * {@code RejectedExecutionException}. jaroslav@1890: */ jaroslav@1890: public static class AbortPolicy implements RejectedExecutionHandler { jaroslav@1890: /** jaroslav@1890: * Creates an {@code AbortPolicy}. jaroslav@1890: */ jaroslav@1890: public AbortPolicy() { } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Always throws RejectedExecutionException. jaroslav@1890: * jaroslav@1890: * @param r the runnable task requested to be executed jaroslav@1890: * @param e the executor attempting to execute this task jaroslav@1890: * @throws RejectedExecutionException always. jaroslav@1890: */ jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { jaroslav@1890: throw new RejectedExecutionException("Task " + r.toString() + jaroslav@1890: " rejected from " + jaroslav@1890: e.toString()); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * A handler for rejected tasks that silently discards the jaroslav@1890: * rejected task. jaroslav@1890: */ jaroslav@1890: public static class DiscardPolicy implements RejectedExecutionHandler { jaroslav@1890: /** jaroslav@1890: * Creates a {@code DiscardPolicy}. jaroslav@1890: */ jaroslav@1890: public DiscardPolicy() { } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Does nothing, which has the effect of discarding task r. jaroslav@1890: * jaroslav@1890: * @param r the runnable task requested to be executed jaroslav@1890: * @param e the executor attempting to execute this task jaroslav@1890: */ jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * A handler for rejected tasks that discards the oldest unhandled jaroslav@1890: * request and then retries {@code execute}, unless the executor jaroslav@1890: * is shut down, in which case the task is discarded. jaroslav@1890: */ jaroslav@1890: public static class DiscardOldestPolicy implements RejectedExecutionHandler { jaroslav@1890: /** jaroslav@1890: * Creates a {@code DiscardOldestPolicy} for the given executor. jaroslav@1890: */ jaroslav@1890: public DiscardOldestPolicy() { } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Obtains and ignores the next task that the executor jaroslav@1890: * would otherwise execute, if one is immediately available, jaroslav@1890: * and then retries execution of task r, unless the executor jaroslav@1890: * is shut down, in which case task r is instead discarded. jaroslav@1890: * jaroslav@1890: * @param r the runnable task requested to be executed jaroslav@1890: * @param e the executor attempting to execute this task jaroslav@1890: */ jaroslav@1890: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { jaroslav@1890: if (!e.isShutdown()) { jaroslav@1890: e.getQueue().poll(); jaroslav@1890: e.execute(r); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: }