jaroslav@1890: /* jaroslav@1890: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. jaroslav@1890: * jaroslav@1890: * This code is free software; you can redistribute it and/or modify it jaroslav@1890: * under the terms of the GNU General Public License version 2 only, as jaroslav@1890: * published by the Free Software Foundation. Oracle designates this jaroslav@1890: * particular file as subject to the "Classpath" exception as provided jaroslav@1890: * by Oracle in the LICENSE file that accompanied this code. jaroslav@1890: * jaroslav@1890: * This code is distributed in the hope that it will be useful, but WITHOUT jaroslav@1890: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or jaroslav@1890: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License jaroslav@1890: * version 2 for more details (a copy is included in the LICENSE file that jaroslav@1890: * accompanied this code). jaroslav@1890: * jaroslav@1890: * You should have received a copy of the GNU General Public License version jaroslav@1890: * 2 along with this work; if not, write to the Free Software Foundation, jaroslav@1890: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. jaroslav@1890: * jaroslav@1890: * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA jaroslav@1890: * or visit www.oracle.com if you need additional information or have any jaroslav@1890: * questions. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * This file is available under and governed by the GNU General Public jaroslav@1890: * License version 2 only, as published by the Free Software Foundation. jaroslav@1890: * However, the following notice accompanied the original version of this jaroslav@1890: * file: jaroslav@1890: * jaroslav@1890: * Written by Doug Lea with assistance from members of JCP JSR-166 jaroslav@1890: * Expert Group and released to the public domain, as explained at jaroslav@1890: * http://creativecommons.org/publicdomain/zero/1.0/ jaroslav@1890: */ jaroslav@1890: jaroslav@1890: package java.util.concurrent; jaroslav@1890: import java.util.concurrent.atomic.*; jaroslav@1890: import java.util.concurrent.locks.*; jaroslav@1890: import java.util.*; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * A {@link ThreadPoolExecutor} that can additionally schedule jaroslav@1890: * commands to run after a given delay, or to execute jaroslav@1890: * periodically. This class is preferable to {@link java.util.Timer} jaroslav@1890: * when multiple worker threads are needed, or when the additional jaroslav@1890: * flexibility or capabilities of {@link ThreadPoolExecutor} (which jaroslav@1890: * this class extends) are required. jaroslav@1890: * jaroslav@1890: *

Delayed tasks execute no sooner than they are enabled, but jaroslav@1890: * without any real-time guarantees about when, after they are jaroslav@1890: * enabled, they will commence. Tasks scheduled for exactly the same jaroslav@1890: * execution time are enabled in first-in-first-out (FIFO) order of jaroslav@1890: * submission. jaroslav@1890: * jaroslav@1890: *

When a submitted task is cancelled before it is run, execution jaroslav@1890: * is suppressed. By default, such a cancelled task is not jaroslav@1890: * automatically removed from the work queue until its delay jaroslav@1890: * elapses. While this enables further inspection and monitoring, it jaroslav@1890: * may also cause unbounded retention of cancelled tasks. To avoid jaroslav@1890: * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which jaroslav@1890: * causes tasks to be immediately removed from the work queue at jaroslav@1890: * time of cancellation. jaroslav@1890: * jaroslav@1890: *

Successive executions of a task scheduled via jaroslav@1890: * {@code scheduleAtFixedRate} or jaroslav@1890: * {@code scheduleWithFixedDelay} do not overlap. While different jaroslav@1890: * executions may be performed by different threads, the effects of jaroslav@1890: * prior executions happen-before jaroslav@1890: * those of subsequent ones. jaroslav@1890: * jaroslav@1890: *

While this class inherits from {@link ThreadPoolExecutor}, a few jaroslav@1890: * of the inherited tuning methods are not useful for it. In jaroslav@1890: * particular, because it acts as a fixed-sized pool using jaroslav@1890: * {@code corePoolSize} threads and an unbounded queue, adjustments jaroslav@1890: * to {@code maximumPoolSize} have no useful effect. Additionally, it jaroslav@1890: * is almost never a good idea to set {@code corePoolSize} to zero or jaroslav@1890: * use {@code allowCoreThreadTimeOut} because this may leave the pool jaroslav@1890: * without threads to handle tasks once they become eligible to run. jaroslav@1890: * jaroslav@1890: *

Extension notes: This class overrides the jaroslav@1890: * {@link ThreadPoolExecutor#execute execute} and jaroslav@1890: * {@link AbstractExecutorService#submit(Runnable) submit} jaroslav@1890: * methods to generate internal {@link ScheduledFuture} objects to jaroslav@1890: * control per-task delays and scheduling. To preserve jaroslav@1890: * functionality, any further overrides of these methods in jaroslav@1890: * subclasses must invoke superclass versions, which effectively jaroslav@1890: * disables additional task customization. However, this class jaroslav@1890: * provides alternative protected extension method jaroslav@1890: * {@code decorateTask} (one version each for {@code Runnable} and jaroslav@1890: * {@code Callable}) that can be used to customize the concrete task jaroslav@1890: * types used to execute commands entered via {@code execute}, jaroslav@1890: * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, jaroslav@1890: * and {@code scheduleWithFixedDelay}. By default, a jaroslav@1890: * {@code ScheduledThreadPoolExecutor} uses a task type extending jaroslav@1890: * {@link FutureTask}. However, this may be modified or replaced using jaroslav@1890: * subclasses of the form: jaroslav@1890: * jaroslav@1890: *

 {@code
jaroslav@1890:  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
jaroslav@1890:  *
jaroslav@1890:  *   static class CustomTask implements RunnableScheduledFuture { ... }
jaroslav@1890:  *
jaroslav@1890:  *   protected  RunnableScheduledFuture decorateTask(
jaroslav@1890:  *                Runnable r, RunnableScheduledFuture task) {
jaroslav@1890:  *       return new CustomTask(r, task);
jaroslav@1890:  *   }
jaroslav@1890:  *
jaroslav@1890:  *   protected  RunnableScheduledFuture decorateTask(
jaroslav@1890:  *                Callable c, RunnableScheduledFuture task) {
jaroslav@1890:  *       return new CustomTask(c, task);
jaroslav@1890:  *   }
jaroslav@1890:  *   // ... add constructors, etc.
jaroslav@1890:  * }}
jaroslav@1890: * jaroslav@1890: * @since 1.5 jaroslav@1890: * @author Doug Lea jaroslav@1890: */ jaroslav@1890: public class ScheduledThreadPoolExecutor jaroslav@1890: extends ThreadPoolExecutor jaroslav@1890: implements ScheduledExecutorService { jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * This class specializes ThreadPoolExecutor implementation by jaroslav@1890: * jaroslav@1890: * 1. Using a custom task type, ScheduledFutureTask for jaroslav@1890: * tasks, even those that don't require scheduling (i.e., jaroslav@1890: * those submitted using ExecutorService execute, not jaroslav@1890: * ScheduledExecutorService methods) which are treated as jaroslav@1890: * delayed tasks with a delay of zero. jaroslav@1890: * jaroslav@1890: * 2. Using a custom queue (DelayedWorkQueue), a variant of jaroslav@1890: * unbounded DelayQueue. The lack of capacity constraint and jaroslav@1890: * the fact that corePoolSize and maximumPoolSize are jaroslav@1890: * effectively identical simplifies some execution mechanics jaroslav@1890: * (see delayedExecute) compared to ThreadPoolExecutor. jaroslav@1890: * jaroslav@1890: * 3. Supporting optional run-after-shutdown parameters, which jaroslav@1890: * leads to overrides of shutdown methods to remove and cancel jaroslav@1890: * tasks that should NOT be run after shutdown, as well as jaroslav@1890: * different recheck logic when task (re)submission overlaps jaroslav@1890: * with a shutdown. jaroslav@1890: * jaroslav@1890: * 4. Task decoration methods to allow interception and jaroslav@1890: * instrumentation, which are needed because subclasses cannot jaroslav@1890: * otherwise override submit methods to get this effect. These jaroslav@1890: * don't have any impact on pool control logic though. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * False if should cancel/suppress periodic tasks on shutdown. jaroslav@1890: */ jaroslav@1890: private volatile boolean continueExistingPeriodicTasksAfterShutdown; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * False if should cancel non-periodic tasks on shutdown. jaroslav@1890: */ jaroslav@1890: private volatile boolean executeExistingDelayedTasksAfterShutdown = true; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * True if ScheduledFutureTask.cancel should remove from queue jaroslav@1890: */ jaroslav@1890: private volatile boolean removeOnCancel = false; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sequence number to break scheduling ties, and in turn to jaroslav@1890: * guarantee FIFO order among tied entries. jaroslav@1890: */ jaroslav@1890: private static final AtomicLong sequencer = new AtomicLong(0); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns current nanosecond time. jaroslav@1890: */ jaroslav@1890: final long now() { jaroslav@1890: return System.nanoTime(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: private class ScheduledFutureTask jaroslav@1890: extends FutureTask implements RunnableScheduledFuture { jaroslav@1890: jaroslav@1890: /** Sequence number to break ties FIFO */ jaroslav@1890: private final long sequenceNumber; jaroslav@1890: jaroslav@1890: /** The time the task is enabled to execute in nanoTime units */ jaroslav@1890: private long time; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Period in nanoseconds for repeating tasks. A positive jaroslav@1890: * value indicates fixed-rate execution. A negative value jaroslav@1890: * indicates fixed-delay execution. A value of 0 indicates a jaroslav@1890: * non-repeating task. jaroslav@1890: */ jaroslav@1890: private final long period; jaroslav@1890: jaroslav@1890: /** The actual task to be re-enqueued by reExecutePeriodic */ jaroslav@1890: RunnableScheduledFuture outerTask = this; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Index into delay queue, to support faster cancellation. jaroslav@1890: */ jaroslav@1890: int heapIndex; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a one-shot action with given nanoTime-based trigger time. jaroslav@1890: */ jaroslav@1890: ScheduledFutureTask(Runnable r, V result, long ns) { jaroslav@1890: super(r, result); jaroslav@1890: this.time = ns; jaroslav@1890: this.period = 0; jaroslav@1890: this.sequenceNumber = sequencer.getAndIncrement(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a periodic action with given nano time and period. jaroslav@1890: */ jaroslav@1890: ScheduledFutureTask(Runnable r, V result, long ns, long period) { jaroslav@1890: super(r, result); jaroslav@1890: this.time = ns; jaroslav@1890: this.period = period; jaroslav@1890: this.sequenceNumber = sequencer.getAndIncrement(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a one-shot action with given nanoTime-based trigger. jaroslav@1890: */ jaroslav@1890: ScheduledFutureTask(Callable callable, long ns) { jaroslav@1890: super(callable); jaroslav@1890: this.time = ns; jaroslav@1890: this.period = 0; jaroslav@1890: this.sequenceNumber = sequencer.getAndIncrement(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public long getDelay(TimeUnit unit) { jaroslav@1890: return unit.convert(time - now(), TimeUnit.NANOSECONDS); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public int compareTo(Delayed other) { jaroslav@1890: if (other == this) // compare zero ONLY if same object jaroslav@1890: return 0; jaroslav@1890: if (other instanceof ScheduledFutureTask) { jaroslav@1890: ScheduledFutureTask x = (ScheduledFutureTask)other; jaroslav@1890: long diff = time - x.time; jaroslav@1890: if (diff < 0) jaroslav@1890: return -1; jaroslav@1890: else if (diff > 0) jaroslav@1890: return 1; jaroslav@1890: else if (sequenceNumber < x.sequenceNumber) jaroslav@1890: return -1; jaroslav@1890: else jaroslav@1890: return 1; jaroslav@1890: } jaroslav@1890: long d = (getDelay(TimeUnit.NANOSECONDS) - jaroslav@1890: other.getDelay(TimeUnit.NANOSECONDS)); jaroslav@1890: return (d == 0) ? 0 : ((d < 0) ? -1 : 1); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns true if this is a periodic (not a one-shot) action. jaroslav@1890: * jaroslav@1890: * @return true if periodic jaroslav@1890: */ jaroslav@1890: public boolean isPeriodic() { jaroslav@1890: return period != 0; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the next time to run for a periodic task. jaroslav@1890: */ jaroslav@1890: private void setNextRunTime() { jaroslav@1890: long p = period; jaroslav@1890: if (p > 0) jaroslav@1890: time += p; jaroslav@1890: else jaroslav@1890: time = triggerTime(-p); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean cancel(boolean mayInterruptIfRunning) { jaroslav@1890: boolean cancelled = super.cancel(mayInterruptIfRunning); jaroslav@1890: if (cancelled && removeOnCancel && heapIndex >= 0) jaroslav@1890: remove(this); jaroslav@1890: return cancelled; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Overrides FutureTask version so as to reset/requeue if periodic. jaroslav@1890: */ jaroslav@1890: public void run() { jaroslav@1890: boolean periodic = isPeriodic(); jaroslav@1890: if (!canRunInCurrentRunState(periodic)) jaroslav@1890: cancel(false); jaroslav@1890: else if (!periodic) jaroslav@1890: ScheduledFutureTask.super.run(); jaroslav@1890: else if (ScheduledFutureTask.super.runAndReset()) { jaroslav@1890: setNextRunTime(); jaroslav@1890: reExecutePeriodic(outerTask); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns true if can run a task given current run state jaroslav@1890: * and run-after-shutdown parameters. jaroslav@1890: * jaroslav@1890: * @param periodic true if this task periodic, false if delayed jaroslav@1890: */ jaroslav@1890: boolean canRunInCurrentRunState(boolean periodic) { jaroslav@1890: return isRunningOrShutdown(periodic ? jaroslav@1890: continueExistingPeriodicTasksAfterShutdown : jaroslav@1890: executeExistingDelayedTasksAfterShutdown); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Main execution method for delayed or periodic tasks. If pool jaroslav@1890: * is shut down, rejects the task. Otherwise adds task to queue jaroslav@1890: * and starts a thread, if necessary, to run it. (We cannot jaroslav@1890: * prestart the thread to run the task because the task (probably) jaroslav@1890: * shouldn't be run yet,) If the pool is shut down while the task jaroslav@1890: * is being added, cancel and remove it if required by state and jaroslav@1890: * run-after-shutdown parameters. jaroslav@1890: * jaroslav@1890: * @param task the task jaroslav@1890: */ jaroslav@1890: private void delayedExecute(RunnableScheduledFuture task) { jaroslav@1890: if (isShutdown()) jaroslav@1890: reject(task); jaroslav@1890: else { jaroslav@1890: super.getQueue().add(task); jaroslav@1890: if (isShutdown() && jaroslav@1890: !canRunInCurrentRunState(task.isPeriodic()) && jaroslav@1890: remove(task)) jaroslav@1890: task.cancel(false); jaroslav@1890: else jaroslav@1890: prestartCoreThread(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Requeues a periodic task unless current run state precludes it. jaroslav@1890: * Same idea as delayedExecute except drops task rather than rejecting. jaroslav@1890: * jaroslav@1890: * @param task the task jaroslav@1890: */ jaroslav@1890: void reExecutePeriodic(RunnableScheduledFuture task) { jaroslav@1890: if (canRunInCurrentRunState(true)) { jaroslav@1890: super.getQueue().add(task); jaroslav@1890: if (!canRunInCurrentRunState(true) && remove(task)) jaroslav@1890: task.cancel(false); jaroslav@1890: else jaroslav@1890: prestartCoreThread(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Cancels and clears the queue of all tasks that should not be run jaroslav@1890: * due to shutdown policy. Invoked within super.shutdown. jaroslav@1890: */ jaroslav@1890: @Override void onShutdown() { jaroslav@1890: BlockingQueue q = super.getQueue(); jaroslav@1890: boolean keepDelayed = jaroslav@1890: getExecuteExistingDelayedTasksAfterShutdownPolicy(); jaroslav@1890: boolean keepPeriodic = jaroslav@1890: getContinueExistingPeriodicTasksAfterShutdownPolicy(); jaroslav@1890: if (!keepDelayed && !keepPeriodic) { jaroslav@1890: for (Object e : q.toArray()) jaroslav@1890: if (e instanceof RunnableScheduledFuture) jaroslav@1890: ((RunnableScheduledFuture) e).cancel(false); jaroslav@1890: q.clear(); jaroslav@1890: } jaroslav@1890: else { jaroslav@1890: // Traverse snapshot to avoid iterator exceptions jaroslav@1890: for (Object e : q.toArray()) { jaroslav@1890: if (e instanceof RunnableScheduledFuture) { jaroslav@1890: RunnableScheduledFuture t = jaroslav@1890: (RunnableScheduledFuture)e; jaroslav@1890: if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || jaroslav@1890: t.isCancelled()) { // also remove if already cancelled jaroslav@1890: if (q.remove(t)) jaroslav@1890: t.cancel(false); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: tryTerminate(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Modifies or replaces the task used to execute a runnable. jaroslav@1890: * This method can be used to override the concrete jaroslav@1890: * class used for managing internal tasks. jaroslav@1890: * The default implementation simply returns the given task. jaroslav@1890: * jaroslav@1890: * @param runnable the submitted Runnable jaroslav@1890: * @param task the task created to execute the runnable jaroslav@1890: * @return a task that can execute the runnable jaroslav@1890: * @since 1.6 jaroslav@1890: */ jaroslav@1890: protected RunnableScheduledFuture decorateTask( jaroslav@1890: Runnable runnable, RunnableScheduledFuture task) { jaroslav@1890: return task; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Modifies or replaces the task used to execute a callable. jaroslav@1890: * This method can be used to override the concrete jaroslav@1890: * class used for managing internal tasks. jaroslav@1890: * The default implementation simply returns the given task. jaroslav@1890: * jaroslav@1890: * @param callable the submitted Callable jaroslav@1890: * @param task the task created to execute the callable jaroslav@1890: * @return a task that can execute the callable jaroslav@1890: * @since 1.6 jaroslav@1890: */ jaroslav@1890: protected RunnableScheduledFuture decorateTask( jaroslav@1890: Callable callable, RunnableScheduledFuture task) { jaroslav@1890: return task; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new {@code ScheduledThreadPoolExecutor} with the jaroslav@1890: * given core pool size. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @throws IllegalArgumentException if {@code corePoolSize < 0} jaroslav@1890: */ jaroslav@1890: public ScheduledThreadPoolExecutor(int corePoolSize) { jaroslav@1890: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, jaroslav@1890: new DelayedWorkQueue()); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new {@code ScheduledThreadPoolExecutor} with the jaroslav@1890: * given initial parameters. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param threadFactory the factory to use when the executor jaroslav@1890: * creates a new thread jaroslav@1890: * @throws IllegalArgumentException if {@code corePoolSize < 0} jaroslav@1890: * @throws NullPointerException if {@code threadFactory} is null jaroslav@1890: */ jaroslav@1890: public ScheduledThreadPoolExecutor(int corePoolSize, jaroslav@1890: ThreadFactory threadFactory) { jaroslav@1890: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, jaroslav@1890: new DelayedWorkQueue(), threadFactory); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new ScheduledThreadPoolExecutor with the given jaroslav@1890: * initial parameters. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param handler the handler to use when execution is blocked jaroslav@1890: * because the thread bounds and queue capacities are reached jaroslav@1890: * @throws IllegalArgumentException if {@code corePoolSize < 0} jaroslav@1890: * @throws NullPointerException if {@code handler} is null jaroslav@1890: */ jaroslav@1890: public ScheduledThreadPoolExecutor(int corePoolSize, jaroslav@1890: RejectedExecutionHandler handler) { jaroslav@1890: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, jaroslav@1890: new DelayedWorkQueue(), handler); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Creates a new ScheduledThreadPoolExecutor with the given jaroslav@1890: * initial parameters. jaroslav@1890: * jaroslav@1890: * @param corePoolSize the number of threads to keep in the pool, even jaroslav@1890: * if they are idle, unless {@code allowCoreThreadTimeOut} is set jaroslav@1890: * @param threadFactory the factory to use when the executor jaroslav@1890: * creates a new thread jaroslav@1890: * @param handler the handler to use when execution is blocked jaroslav@1890: * because the thread bounds and queue capacities are reached jaroslav@1890: * @throws IllegalArgumentException if {@code corePoolSize < 0} jaroslav@1890: * @throws NullPointerException if {@code threadFactory} or jaroslav@1890: * {@code handler} is null jaroslav@1890: */ jaroslav@1890: public ScheduledThreadPoolExecutor(int corePoolSize, jaroslav@1890: ThreadFactory threadFactory, jaroslav@1890: RejectedExecutionHandler handler) { jaroslav@1890: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, jaroslav@1890: new DelayedWorkQueue(), threadFactory, handler); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the trigger time of a delayed action. jaroslav@1890: */ jaroslav@1890: private long triggerTime(long delay, TimeUnit unit) { jaroslav@1890: return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the trigger time of a delayed action. jaroslav@1890: */ jaroslav@1890: long triggerTime(long delay) { jaroslav@1890: return now() + jaroslav@1890: ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Constrains the values of all delays in the queue to be within jaroslav@1890: * Long.MAX_VALUE of each other, to avoid overflow in compareTo. jaroslav@1890: * This may occur if a task is eligible to be dequeued, but has jaroslav@1890: * not yet been, while some other task is added with a delay of jaroslav@1890: * Long.MAX_VALUE. jaroslav@1890: */ jaroslav@1890: private long overflowFree(long delay) { jaroslav@1890: Delayed head = (Delayed) super.getQueue().peek(); jaroslav@1890: if (head != null) { jaroslav@1890: long headDelay = head.getDelay(TimeUnit.NANOSECONDS); jaroslav@1890: if (headDelay < 0 && (delay - headDelay < 0)) jaroslav@1890: delay = Long.MAX_VALUE + headDelay; jaroslav@1890: } jaroslav@1890: return delay; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public ScheduledFuture schedule(Runnable command, jaroslav@1890: long delay, jaroslav@1890: TimeUnit unit) { jaroslav@1890: if (command == null || unit == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: RunnableScheduledFuture t = decorateTask(command, jaroslav@1890: new ScheduledFutureTask(command, null, jaroslav@1890: triggerTime(delay, unit))); jaroslav@1890: delayedExecute(t); jaroslav@1890: return t; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public ScheduledFuture schedule(Callable callable, jaroslav@1890: long delay, jaroslav@1890: TimeUnit unit) { jaroslav@1890: if (callable == null || unit == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: RunnableScheduledFuture t = decorateTask(callable, jaroslav@1890: new ScheduledFutureTask(callable, jaroslav@1890: triggerTime(delay, unit))); jaroslav@1890: delayedExecute(t); jaroslav@1890: return t; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: * @throws IllegalArgumentException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public ScheduledFuture scheduleAtFixedRate(Runnable command, jaroslav@1890: long initialDelay, jaroslav@1890: long period, jaroslav@1890: TimeUnit unit) { jaroslav@1890: if (command == null || unit == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: if (period <= 0) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: ScheduledFutureTask sft = jaroslav@1890: new ScheduledFutureTask(command, jaroslav@1890: null, jaroslav@1890: triggerTime(initialDelay, unit), jaroslav@1890: unit.toNanos(period)); jaroslav@1890: RunnableScheduledFuture t = decorateTask(command, sft); jaroslav@1890: sft.outerTask = t; jaroslav@1890: delayedExecute(t); jaroslav@1890: return t; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: * @throws IllegalArgumentException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public ScheduledFuture scheduleWithFixedDelay(Runnable command, jaroslav@1890: long initialDelay, jaroslav@1890: long delay, jaroslav@1890: TimeUnit unit) { jaroslav@1890: if (command == null || unit == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: if (delay <= 0) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: ScheduledFutureTask sft = jaroslav@1890: new ScheduledFutureTask(command, jaroslav@1890: null, jaroslav@1890: triggerTime(initialDelay, unit), jaroslav@1890: unit.toNanos(-delay)); jaroslav@1890: RunnableScheduledFuture t = decorateTask(command, sft); jaroslav@1890: sft.outerTask = t; jaroslav@1890: delayedExecute(t); jaroslav@1890: return t; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Executes {@code command} with zero required delay. jaroslav@1890: * This has effect equivalent to jaroslav@1890: * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. jaroslav@1890: * Note that inspections of the queue and of the list returned by jaroslav@1890: * {@code shutdownNow} will access the zero-delayed jaroslav@1890: * {@link ScheduledFuture}, not the {@code command} itself. jaroslav@1890: * jaroslav@1890: *

A consequence of the use of {@code ScheduledFuture} objects is jaroslav@1890: * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always jaroslav@1890: * called with a null second {@code Throwable} argument, even if the jaroslav@1890: * {@code command} terminated abruptly. Instead, the {@code Throwable} jaroslav@1890: * thrown by such a task can be obtained via {@link Future#get}. jaroslav@1890: * jaroslav@1890: * @throws RejectedExecutionException at discretion of jaroslav@1890: * {@code RejectedExecutionHandler}, if the task jaroslav@1890: * cannot be accepted for execution because the jaroslav@1890: * executor has been shut down jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public void execute(Runnable command) { jaroslav@1890: schedule(command, 0, TimeUnit.NANOSECONDS); jaroslav@1890: } jaroslav@1890: jaroslav@1890: // Override AbstractExecutorService methods jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public Future submit(Runnable task) { jaroslav@1890: return schedule(task, 0, TimeUnit.NANOSECONDS); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public Future submit(Runnable task, T result) { jaroslav@1890: return schedule(Executors.callable(task, result), jaroslav@1890: 0, TimeUnit.NANOSECONDS); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc} jaroslav@1890: * @throws NullPointerException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public Future submit(Callable task) { jaroslav@1890: return schedule(task, 0, TimeUnit.NANOSECONDS); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the policy on whether to continue executing existing jaroslav@1890: * periodic tasks even when this executor has been {@code shutdown}. jaroslav@1890: * In this case, these tasks will only terminate upon jaroslav@1890: * {@code shutdownNow} or after setting the policy to jaroslav@1890: * {@code false} when already shutdown. jaroslav@1890: * This value is by default {@code false}. jaroslav@1890: * jaroslav@1890: * @param value if {@code true}, continue after shutdown, else don't. jaroslav@1890: * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy jaroslav@1890: */ jaroslav@1890: public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { jaroslav@1890: continueExistingPeriodicTasksAfterShutdown = value; jaroslav@1890: if (!value && isShutdown()) jaroslav@1890: onShutdown(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Gets the policy on whether to continue executing existing jaroslav@1890: * periodic tasks even when this executor has been {@code shutdown}. jaroslav@1890: * In this case, these tasks will only terminate upon jaroslav@1890: * {@code shutdownNow} or after setting the policy to jaroslav@1890: * {@code false} when already shutdown. jaroslav@1890: * This value is by default {@code false}. jaroslav@1890: * jaroslav@1890: * @return {@code true} if will continue after shutdown jaroslav@1890: * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy jaroslav@1890: */ jaroslav@1890: public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { jaroslav@1890: return continueExistingPeriodicTasksAfterShutdown; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the policy on whether to execute existing delayed jaroslav@1890: * tasks even when this executor has been {@code shutdown}. jaroslav@1890: * In this case, these tasks will only terminate upon jaroslav@1890: * {@code shutdownNow}, or after setting the policy to jaroslav@1890: * {@code false} when already shutdown. jaroslav@1890: * This value is by default {@code true}. jaroslav@1890: * jaroslav@1890: * @param value if {@code true}, execute after shutdown, else don't. jaroslav@1890: * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy jaroslav@1890: */ jaroslav@1890: public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { jaroslav@1890: executeExistingDelayedTasksAfterShutdown = value; jaroslav@1890: if (!value && isShutdown()) jaroslav@1890: onShutdown(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Gets the policy on whether to execute existing delayed jaroslav@1890: * tasks even when this executor has been {@code shutdown}. jaroslav@1890: * In this case, these tasks will only terminate upon jaroslav@1890: * {@code shutdownNow}, or after setting the policy to jaroslav@1890: * {@code false} when already shutdown. jaroslav@1890: * This value is by default {@code true}. jaroslav@1890: * jaroslav@1890: * @return {@code true} if will execute after shutdown jaroslav@1890: * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy jaroslav@1890: */ jaroslav@1890: public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { jaroslav@1890: return executeExistingDelayedTasksAfterShutdown; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sets the policy on whether cancelled tasks should be immediately jaroslav@1890: * removed from the work queue at time of cancellation. This value is jaroslav@1890: * by default {@code false}. jaroslav@1890: * jaroslav@1890: * @param value if {@code true}, remove on cancellation, else don't jaroslav@1890: * @see #getRemoveOnCancelPolicy jaroslav@1890: * @since 1.7 jaroslav@1890: */ jaroslav@1890: public void setRemoveOnCancelPolicy(boolean value) { jaroslav@1890: removeOnCancel = value; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Gets the policy on whether cancelled tasks should be immediately jaroslav@1890: * removed from the work queue at time of cancellation. This value is jaroslav@1890: * by default {@code false}. jaroslav@1890: * jaroslav@1890: * @return {@code true} if cancelled tasks are immediately removed jaroslav@1890: * from the queue jaroslav@1890: * @see #setRemoveOnCancelPolicy jaroslav@1890: * @since 1.7 jaroslav@1890: */ jaroslav@1890: public boolean getRemoveOnCancelPolicy() { jaroslav@1890: return removeOnCancel; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Initiates an orderly shutdown in which previously submitted jaroslav@1890: * tasks are executed, but no new tasks will be accepted. jaroslav@1890: * Invocation has no additional effect if already shut down. jaroslav@1890: * jaroslav@1890: *

This method does not wait for previously submitted tasks to jaroslav@1890: * complete execution. Use {@link #awaitTermination awaitTermination} jaroslav@1890: * to do that. jaroslav@1890: * jaroslav@1890: *

If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} jaroslav@1890: * has been set {@code false}, existing delayed tasks whose delays jaroslav@1890: * have not yet elapsed are cancelled. And unless the {@code jaroslav@1890: * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set jaroslav@1890: * {@code true}, future executions of existing periodic tasks will jaroslav@1890: * be cancelled. jaroslav@1890: * jaroslav@1890: * @throws SecurityException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public void shutdown() { jaroslav@1890: super.shutdown(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Attempts to stop all actively executing tasks, halts the jaroslav@1890: * processing of waiting tasks, and returns a list of the tasks jaroslav@1890: * that were awaiting execution. jaroslav@1890: * jaroslav@1890: *

This method does not wait for actively executing tasks to jaroslav@1890: * terminate. Use {@link #awaitTermination awaitTermination} to jaroslav@1890: * do that. jaroslav@1890: * jaroslav@1890: *

There are no guarantees beyond best-effort attempts to stop jaroslav@1890: * processing actively executing tasks. This implementation jaroslav@1890: * cancels tasks via {@link Thread#interrupt}, so any task that jaroslav@1890: * fails to respond to interrupts may never terminate. jaroslav@1890: * jaroslav@1890: * @return list of tasks that never commenced execution. jaroslav@1890: * Each element of this list is a {@link ScheduledFuture}, jaroslav@1890: * including those tasks submitted using {@code execute}, jaroslav@1890: * which are for scheduling purposes used as the basis of a jaroslav@1890: * zero-delay {@code ScheduledFuture}. jaroslav@1890: * @throws SecurityException {@inheritDoc} jaroslav@1890: */ jaroslav@1890: public List shutdownNow() { jaroslav@1890: return super.shutdownNow(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Returns the task queue used by this executor. Each element of jaroslav@1890: * this queue is a {@link ScheduledFuture}, including those jaroslav@1890: * tasks submitted using {@code execute} which are for scheduling jaroslav@1890: * purposes used as the basis of a zero-delay jaroslav@1890: * {@code ScheduledFuture}. Iteration over this queue is jaroslav@1890: * not guaranteed to traverse tasks in the order in jaroslav@1890: * which they will execute. jaroslav@1890: * jaroslav@1890: * @return the task queue jaroslav@1890: */ jaroslav@1890: public BlockingQueue getQueue() { jaroslav@1890: return super.getQueue(); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Specialized delay queue. To mesh with TPE declarations, this jaroslav@1890: * class must be declared as a BlockingQueue even though jaroslav@1890: * it can only hold RunnableScheduledFutures. jaroslav@1890: */ jaroslav@1890: static class DelayedWorkQueue extends AbstractQueue jaroslav@1890: implements BlockingQueue { jaroslav@1890: jaroslav@1890: /* jaroslav@1890: * A DelayedWorkQueue is based on a heap-based data structure jaroslav@1890: * like those in DelayQueue and PriorityQueue, except that jaroslav@1890: * every ScheduledFutureTask also records its index into the jaroslav@1890: * heap array. This eliminates the need to find a task upon jaroslav@1890: * cancellation, greatly speeding up removal (down from O(n) jaroslav@1890: * to O(log n)), and reducing garbage retention that would jaroslav@1890: * otherwise occur by waiting for the element to rise to top jaroslav@1890: * before clearing. But because the queue may also hold jaroslav@1890: * RunnableScheduledFutures that are not ScheduledFutureTasks, jaroslav@1890: * we are not guaranteed to have such indices available, in jaroslav@1890: * which case we fall back to linear search. (We expect that jaroslav@1890: * most tasks will not be decorated, and that the faster cases jaroslav@1890: * will be much more common.) jaroslav@1890: * jaroslav@1890: * All heap operations must record index changes -- mainly jaroslav@1890: * within siftUp and siftDown. Upon removal, a task's jaroslav@1890: * heapIndex is set to -1. Note that ScheduledFutureTasks can jaroslav@1890: * appear at most once in the queue (this need not be true for jaroslav@1890: * other kinds of tasks or work queues), so are uniquely jaroslav@1890: * identified by heapIndex. jaroslav@1890: */ jaroslav@1890: jaroslav@1890: private static final int INITIAL_CAPACITY = 16; jaroslav@1890: private RunnableScheduledFuture[] queue = jaroslav@1890: new RunnableScheduledFuture[INITIAL_CAPACITY]; jaroslav@1890: private final ReentrantLock lock = new ReentrantLock(); jaroslav@1890: private int size = 0; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Thread designated to wait for the task at the head of the jaroslav@1890: * queue. This variant of the Leader-Follower pattern jaroslav@1890: * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to jaroslav@1890: * minimize unnecessary timed waiting. When a thread becomes jaroslav@1890: * the leader, it waits only for the next delay to elapse, but jaroslav@1890: * other threads await indefinitely. The leader thread must jaroslav@1890: * signal some other thread before returning from take() or jaroslav@1890: * poll(...), unless some other thread becomes leader in the jaroslav@1890: * interim. Whenever the head of the queue is replaced with a jaroslav@1890: * task with an earlier expiration time, the leader field is jaroslav@1890: * invalidated by being reset to null, and some waiting jaroslav@1890: * thread, but not necessarily the current leader, is jaroslav@1890: * signalled. So waiting threads must be prepared to acquire jaroslav@1890: * and lose leadership while waiting. jaroslav@1890: */ jaroslav@1890: private Thread leader = null; jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Condition signalled when a newer task becomes available at the jaroslav@1890: * head of the queue or a new thread may need to become leader. jaroslav@1890: */ jaroslav@1890: private final Condition available = lock.newCondition(); jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Set f's heapIndex if it is a ScheduledFutureTask. jaroslav@1890: */ jaroslav@1890: private void setIndex(RunnableScheduledFuture f, int idx) { jaroslav@1890: if (f instanceof ScheduledFutureTask) jaroslav@1890: ((ScheduledFutureTask)f).heapIndex = idx; jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sift element added at bottom up to its heap-ordered spot. jaroslav@1890: * Call only when holding lock. jaroslav@1890: */ jaroslav@1890: private void siftUp(int k, RunnableScheduledFuture key) { jaroslav@1890: while (k > 0) { jaroslav@1890: int parent = (k - 1) >>> 1; jaroslav@1890: RunnableScheduledFuture e = queue[parent]; jaroslav@1890: if (key.compareTo(e) >= 0) jaroslav@1890: break; jaroslav@1890: queue[k] = e; jaroslav@1890: setIndex(e, k); jaroslav@1890: k = parent; jaroslav@1890: } jaroslav@1890: queue[k] = key; jaroslav@1890: setIndex(key, k); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Sift element added at top down to its heap-ordered spot. jaroslav@1890: * Call only when holding lock. jaroslav@1890: */ jaroslav@1890: private void siftDown(int k, RunnableScheduledFuture key) { jaroslav@1890: int half = size >>> 1; jaroslav@1890: while (k < half) { jaroslav@1890: int child = (k << 1) + 1; jaroslav@1890: RunnableScheduledFuture c = queue[child]; jaroslav@1890: int right = child + 1; jaroslav@1890: if (right < size && c.compareTo(queue[right]) > 0) jaroslav@1890: c = queue[child = right]; jaroslav@1890: if (key.compareTo(c) <= 0) jaroslav@1890: break; jaroslav@1890: queue[k] = c; jaroslav@1890: setIndex(c, k); jaroslav@1890: k = child; jaroslav@1890: } jaroslav@1890: queue[k] = key; jaroslav@1890: setIndex(key, k); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Resize the heap array. Call only when holding lock. jaroslav@1890: */ jaroslav@1890: private void grow() { jaroslav@1890: int oldCapacity = queue.length; jaroslav@1890: int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% jaroslav@1890: if (newCapacity < 0) // overflow jaroslav@1890: newCapacity = Integer.MAX_VALUE; jaroslav@1890: queue = Arrays.copyOf(queue, newCapacity); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Find index of given object, or -1 if absent jaroslav@1890: */ jaroslav@1890: private int indexOf(Object x) { jaroslav@1890: if (x != null) { jaroslav@1890: if (x instanceof ScheduledFutureTask) { jaroslav@1890: int i = ((ScheduledFutureTask) x).heapIndex; jaroslav@1890: // Sanity check; x could conceivably be a jaroslav@1890: // ScheduledFutureTask from some other pool. jaroslav@1890: if (i >= 0 && i < size && queue[i] == x) jaroslav@1890: return i; jaroslav@1890: } else { jaroslav@1890: for (int i = 0; i < size; i++) jaroslav@1890: if (x.equals(queue[i])) jaroslav@1890: return i; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: return -1; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean contains(Object x) { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: return indexOf(x) != -1; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean remove(Object x) { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: int i = indexOf(x); jaroslav@1890: if (i < 0) jaroslav@1890: return false; jaroslav@1890: jaroslav@1890: setIndex(queue[i], -1); jaroslav@1890: int s = --size; jaroslav@1890: RunnableScheduledFuture replacement = queue[s]; jaroslav@1890: queue[s] = null; jaroslav@1890: if (s != i) { jaroslav@1890: siftDown(i, replacement); jaroslav@1890: if (queue[i] == replacement) jaroslav@1890: siftUp(i, replacement); jaroslav@1890: } jaroslav@1890: return true; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public int size() { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: return size; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean isEmpty() { jaroslav@1890: return size() == 0; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public int remainingCapacity() { jaroslav@1890: return Integer.MAX_VALUE; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public RunnableScheduledFuture peek() { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: return queue[0]; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean offer(Runnable x) { jaroslav@1890: if (x == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: RunnableScheduledFuture e = (RunnableScheduledFuture)x; jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: int i = size; jaroslav@1890: if (i >= queue.length) jaroslav@1890: grow(); jaroslav@1890: size = i + 1; jaroslav@1890: if (i == 0) { jaroslav@1890: queue[0] = e; jaroslav@1890: setIndex(e, 0); jaroslav@1890: } else { jaroslav@1890: siftUp(i, e); jaroslav@1890: } jaroslav@1890: if (queue[0] == e) { jaroslav@1890: leader = null; jaroslav@1890: available.signal(); jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: return true; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public void put(Runnable e) { jaroslav@1890: offer(e); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean add(Runnable e) { jaroslav@1890: return offer(e); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean offer(Runnable e, long timeout, TimeUnit unit) { jaroslav@1890: return offer(e); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Performs common bookkeeping for poll and take: Replaces jaroslav@1890: * first element with last and sifts it down. Call only when jaroslav@1890: * holding lock. jaroslav@1890: * @param f the task to remove and return jaroslav@1890: */ jaroslav@1890: private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { jaroslav@1890: int s = --size; jaroslav@1890: RunnableScheduledFuture x = queue[s]; jaroslav@1890: queue[s] = null; jaroslav@1890: if (s != 0) jaroslav@1890: siftDown(0, x); jaroslav@1890: setIndex(f, -1); jaroslav@1890: return f; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public RunnableScheduledFuture poll() { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: RunnableScheduledFuture first = queue[0]; jaroslav@1890: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) jaroslav@1890: return null; jaroslav@1890: else jaroslav@1890: return finishPoll(first); jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public RunnableScheduledFuture take() throws InterruptedException { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lockInterruptibly(); jaroslav@1890: try { jaroslav@1890: for (;;) { jaroslav@1890: RunnableScheduledFuture first = queue[0]; jaroslav@1890: if (first == null) jaroslav@1890: available.await(); jaroslav@1890: else { jaroslav@1890: long delay = first.getDelay(TimeUnit.NANOSECONDS); jaroslav@1890: if (delay <= 0) jaroslav@1890: return finishPoll(first); jaroslav@1890: else if (leader != null) jaroslav@1890: available.await(); jaroslav@1890: else { jaroslav@1890: Thread thisThread = Thread.currentThread(); jaroslav@1890: leader = thisThread; jaroslav@1890: try { jaroslav@1890: available.awaitNanos(delay); jaroslav@1890: } finally { jaroslav@1890: if (leader == thisThread) jaroslav@1890: leader = null; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: if (leader == null && queue[0] != null) jaroslav@1890: available.signal(); jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public RunnableScheduledFuture poll(long timeout, TimeUnit unit) jaroslav@1890: throws InterruptedException { jaroslav@1890: long nanos = unit.toNanos(timeout); jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lockInterruptibly(); jaroslav@1890: try { jaroslav@1890: for (;;) { jaroslav@1890: RunnableScheduledFuture first = queue[0]; jaroslav@1890: if (first == null) { jaroslav@1890: if (nanos <= 0) jaroslav@1890: return null; jaroslav@1890: else jaroslav@1890: nanos = available.awaitNanos(nanos); jaroslav@1890: } else { jaroslav@1890: long delay = first.getDelay(TimeUnit.NANOSECONDS); jaroslav@1890: if (delay <= 0) jaroslav@1890: return finishPoll(first); jaroslav@1890: if (nanos <= 0) jaroslav@1890: return null; jaroslav@1890: if (nanos < delay || leader != null) jaroslav@1890: nanos = available.awaitNanos(nanos); jaroslav@1890: else { jaroslav@1890: Thread thisThread = Thread.currentThread(); jaroslav@1890: leader = thisThread; jaroslav@1890: try { jaroslav@1890: long timeLeft = available.awaitNanos(delay); jaroslav@1890: nanos -= delay - timeLeft; jaroslav@1890: } finally { jaroslav@1890: if (leader == thisThread) jaroslav@1890: leader = null; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } finally { jaroslav@1890: if (leader == null && queue[0] != null) jaroslav@1890: available.signal(); jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public void clear() { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: for (int i = 0; i < size; i++) { jaroslav@1890: RunnableScheduledFuture t = queue[i]; jaroslav@1890: if (t != null) { jaroslav@1890: queue[i] = null; jaroslav@1890: setIndex(t, -1); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: size = 0; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Return and remove first element only if it is expired. jaroslav@1890: * Used only by drainTo. Call only when holding lock. jaroslav@1890: */ jaroslav@1890: private RunnableScheduledFuture pollExpired() { jaroslav@1890: RunnableScheduledFuture first = queue[0]; jaroslav@1890: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) jaroslav@1890: return null; jaroslav@1890: return finishPoll(first); jaroslav@1890: } jaroslav@1890: jaroslav@1890: public int drainTo(Collection c) { jaroslav@1890: if (c == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: if (c == this) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: RunnableScheduledFuture first; jaroslav@1890: int n = 0; jaroslav@1890: while ((first = pollExpired()) != null) { jaroslav@1890: c.add(first); jaroslav@1890: ++n; jaroslav@1890: } jaroslav@1890: return n; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public int drainTo(Collection c, int maxElements) { jaroslav@1890: if (c == null) jaroslav@1890: throw new NullPointerException(); jaroslav@1890: if (c == this) jaroslav@1890: throw new IllegalArgumentException(); jaroslav@1890: if (maxElements <= 0) jaroslav@1890: return 0; jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: RunnableScheduledFuture first; jaroslav@1890: int n = 0; jaroslav@1890: while (n < maxElements && (first = pollExpired()) != null) { jaroslav@1890: c.add(first); jaroslav@1890: ++n; jaroslav@1890: } jaroslav@1890: return n; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public Object[] toArray() { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: return Arrays.copyOf(queue, size, Object[].class); jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: @SuppressWarnings("unchecked") jaroslav@1890: public T[] toArray(T[] a) { jaroslav@1890: final ReentrantLock lock = this.lock; jaroslav@1890: lock.lock(); jaroslav@1890: try { jaroslav@1890: if (a.length < size) jaroslav@1890: return (T[]) Arrays.copyOf(queue, size, a.getClass()); jaroslav@1890: System.arraycopy(queue, 0, a, 0, size); jaroslav@1890: if (a.length > size) jaroslav@1890: a[size] = null; jaroslav@1890: return a; jaroslav@1890: } finally { jaroslav@1890: lock.unlock(); jaroslav@1890: } jaroslav@1890: } jaroslav@1890: jaroslav@1890: public Iterator iterator() { jaroslav@1890: return new Itr(Arrays.copyOf(queue, size)); jaroslav@1890: } jaroslav@1890: jaroslav@1890: /** jaroslav@1890: * Snapshot iterator that works off copy of underlying q array. jaroslav@1890: */ jaroslav@1890: private class Itr implements Iterator { jaroslav@1890: final RunnableScheduledFuture[] array; jaroslav@1890: int cursor = 0; // index of next element to return jaroslav@1890: int lastRet = -1; // index of last element, or -1 if no such jaroslav@1890: jaroslav@1890: Itr(RunnableScheduledFuture[] array) { jaroslav@1890: this.array = array; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public boolean hasNext() { jaroslav@1890: return cursor < array.length; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public Runnable next() { jaroslav@1890: if (cursor >= array.length) jaroslav@1890: throw new NoSuchElementException(); jaroslav@1890: lastRet = cursor; jaroslav@1890: return array[cursor++]; jaroslav@1890: } jaroslav@1890: jaroslav@1890: public void remove() { jaroslav@1890: if (lastRet < 0) jaroslav@1890: throw new IllegalStateException(); jaroslav@1890: DelayedWorkQueue.this.remove(array[lastRet]); jaroslav@1890: lastRet = -1; jaroslav@1890: } jaroslav@1890: } jaroslav@1890: } jaroslav@1890: }