rt/emul/compact/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
36 package java.util.concurrent;
37 import java.util.concurrent.atomic.*;
38 import java.util.concurrent.locks.*;
42 * A {@link ThreadPoolExecutor} that can additionally schedule
43 * commands to run after a given delay, or to execute
44 * periodically. This class is preferable to {@link java.util.Timer}
45 * when multiple worker threads are needed, or when the additional
46 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
47 * this class extends) are required.
49 * <p>Delayed tasks execute no sooner than they are enabled, but
50 * without any real-time guarantees about when, after they are
51 * enabled, they will commence. Tasks scheduled for exactly the same
52 * execution time are enabled in first-in-first-out (FIFO) order of
55 * <p>When a submitted task is cancelled before it is run, execution
56 * is suppressed. By default, such a cancelled task is not
57 * automatically removed from the work queue until its delay
58 * elapses. While this enables further inspection and monitoring, it
59 * may also cause unbounded retention of cancelled tasks. To avoid
60 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
61 * causes tasks to be immediately removed from the work queue at
62 * time of cancellation.
64 * <p>Successive executions of a task scheduled via
65 * {@code scheduleAtFixedRate} or
66 * {@code scheduleWithFixedDelay} do not overlap. While different
67 * executions may be performed by different threads, the effects of
69 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
70 * those of subsequent ones.
72 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
73 * of the inherited tuning methods are not useful for it. In
74 * particular, because it acts as a fixed-sized pool using
75 * {@code corePoolSize} threads and an unbounded queue, adjustments
76 * to {@code maximumPoolSize} have no useful effect. Additionally, it
77 * is almost never a good idea to set {@code corePoolSize} to zero or
78 * use {@code allowCoreThreadTimeOut} because this may leave the pool
79 * without threads to handle tasks once they become eligible to run.
81 * <p><b>Extension notes:</b> This class overrides the
82 * {@link ThreadPoolExecutor#execute execute} and
83 * {@link AbstractExecutorService#submit(Runnable) submit}
84 * methods to generate internal {@link ScheduledFuture} objects to
85 * control per-task delays and scheduling. To preserve
86 * functionality, any further overrides of these methods in
87 * subclasses must invoke superclass versions, which effectively
88 * disables additional task customization. However, this class
89 * provides alternative protected extension method
90 * {@code decorateTask} (one version each for {@code Runnable} and
91 * {@code Callable}) that can be used to customize the concrete task
92 * types used to execute commands entered via {@code execute},
93 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
94 * and {@code scheduleWithFixedDelay}. By default, a
95 * {@code ScheduledThreadPoolExecutor} uses a task type extending
96 * {@link FutureTask}. However, this may be modified or replaced using
97 * subclasses of the form:
100 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
102 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
104 * protected <V> RunnableScheduledFuture<V> decorateTask(
105 * Runnable r, RunnableScheduledFuture<V> task) {
106 * return new CustomTask<V>(r, task);
109 * protected <V> RunnableScheduledFuture<V> decorateTask(
110 * Callable<V> c, RunnableScheduledFuture<V> task) {
111 * return new CustomTask<V>(c, task);
113 * // ... add constructors, etc.
119 public class ScheduledThreadPoolExecutor
120 extends ThreadPoolExecutor
121 implements ScheduledExecutorService {
124 * This class specializes ThreadPoolExecutor implementation by
126 * 1. Using a custom task type, ScheduledFutureTask for
127 * tasks, even those that don't require scheduling (i.e.,
128 * those submitted using ExecutorService execute, not
129 * ScheduledExecutorService methods) which are treated as
130 * delayed tasks with a delay of zero.
132 * 2. Using a custom queue (DelayedWorkQueue), a variant of
133 * unbounded DelayQueue. The lack of capacity constraint and
134 * the fact that corePoolSize and maximumPoolSize are
135 * effectively identical simplifies some execution mechanics
136 * (see delayedExecute) compared to ThreadPoolExecutor.
138 * 3. Supporting optional run-after-shutdown parameters, which
139 * leads to overrides of shutdown methods to remove and cancel
140 * tasks that should NOT be run after shutdown, as well as
141 * different recheck logic when task (re)submission overlaps
144 * 4. Task decoration methods to allow interception and
145 * instrumentation, which are needed because subclasses cannot
146 * otherwise override submit methods to get this effect. These
147 * don't have any impact on pool control logic though.
151 * False if should cancel/suppress periodic tasks on shutdown.
153 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
156 * False if should cancel non-periodic tasks on shutdown.
158 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
161 * True if ScheduledFutureTask.cancel should remove from queue
163 private volatile boolean removeOnCancel = false;
166 * Sequence number to break scheduling ties, and in turn to
167 * guarantee FIFO order among tied entries.
169 private static final AtomicLong sequencer = new AtomicLong(0);
172 * Returns current nanosecond time.
175 return System.nanoTime();
178 private class ScheduledFutureTask<V>
179 extends FutureTask<V> implements RunnableScheduledFuture<V> {
181 /** Sequence number to break ties FIFO */
182 private final long sequenceNumber;
184 /** The time the task is enabled to execute in nanoTime units */
188 * Period in nanoseconds for repeating tasks. A positive
189 * value indicates fixed-rate execution. A negative value
190 * indicates fixed-delay execution. A value of 0 indicates a
191 * non-repeating task.
193 private final long period;
195 /** The actual task to be re-enqueued by reExecutePeriodic */
196 RunnableScheduledFuture<V> outerTask = this;
199 * Index into delay queue, to support faster cancellation.
204 * Creates a one-shot action with given nanoTime-based trigger time.
206 ScheduledFutureTask(Runnable r, V result, long ns) {
210 this.sequenceNumber = sequencer.getAndIncrement();
214 * Creates a periodic action with given nano time and period.
216 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
219 this.period = period;
220 this.sequenceNumber = sequencer.getAndIncrement();
224 * Creates a one-shot action with given nanoTime-based trigger.
226 ScheduledFutureTask(Callable<V> callable, long ns) {
230 this.sequenceNumber = sequencer.getAndIncrement();
233 public long getDelay(TimeUnit unit) {
234 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
237 public int compareTo(Delayed other) {
238 if (other == this) // compare zero ONLY if same object
240 if (other instanceof ScheduledFutureTask) {
241 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
242 long diff = time - x.time;
247 else if (sequenceNumber < x.sequenceNumber)
252 long d = (getDelay(TimeUnit.NANOSECONDS) -
253 other.getDelay(TimeUnit.NANOSECONDS));
254 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
258 * Returns true if this is a periodic (not a one-shot) action.
260 * @return true if periodic
262 public boolean isPeriodic() {
267 * Sets the next time to run for a periodic task.
269 private void setNextRunTime() {
274 time = triggerTime(-p);
277 public boolean cancel(boolean mayInterruptIfRunning) {
278 boolean cancelled = super.cancel(mayInterruptIfRunning);
279 if (cancelled && removeOnCancel && heapIndex >= 0)
285 * Overrides FutureTask version so as to reset/requeue if periodic.
288 boolean periodic = isPeriodic();
289 if (!canRunInCurrentRunState(periodic))
292 ScheduledFutureTask.super.run();
293 else if (ScheduledFutureTask.super.runAndReset()) {
295 reExecutePeriodic(outerTask);
301 * Returns true if can run a task given current run state
302 * and run-after-shutdown parameters.
304 * @param periodic true if this task periodic, false if delayed
306 boolean canRunInCurrentRunState(boolean periodic) {
307 return isRunningOrShutdown(periodic ?
308 continueExistingPeriodicTasksAfterShutdown :
309 executeExistingDelayedTasksAfterShutdown);
313 * Main execution method for delayed or periodic tasks. If pool
314 * is shut down, rejects the task. Otherwise adds task to queue
315 * and starts a thread, if necessary, to run it. (We cannot
316 * prestart the thread to run the task because the task (probably)
317 * shouldn't be run yet,) If the pool is shut down while the task
318 * is being added, cancel and remove it if required by state and
319 * run-after-shutdown parameters.
321 * @param task the task
323 private void delayedExecute(RunnableScheduledFuture<?> task) {
327 super.getQueue().add(task);
329 !canRunInCurrentRunState(task.isPeriodic()) &&
333 prestartCoreThread();
338 * Requeues a periodic task unless current run state precludes it.
339 * Same idea as delayedExecute except drops task rather than rejecting.
341 * @param task the task
343 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
344 if (canRunInCurrentRunState(true)) {
345 super.getQueue().add(task);
346 if (!canRunInCurrentRunState(true) && remove(task))
349 prestartCoreThread();
354 * Cancels and clears the queue of all tasks that should not be run
355 * due to shutdown policy. Invoked within super.shutdown.
357 @Override void onShutdown() {
358 BlockingQueue<Runnable> q = super.getQueue();
359 boolean keepDelayed =
360 getExecuteExistingDelayedTasksAfterShutdownPolicy();
361 boolean keepPeriodic =
362 getContinueExistingPeriodicTasksAfterShutdownPolicy();
363 if (!keepDelayed && !keepPeriodic) {
364 for (Object e : q.toArray())
365 if (e instanceof RunnableScheduledFuture<?>)
366 ((RunnableScheduledFuture<?>) e).cancel(false);
370 // Traverse snapshot to avoid iterator exceptions
371 for (Object e : q.toArray()) {
372 if (e instanceof RunnableScheduledFuture) {
373 RunnableScheduledFuture<?> t =
374 (RunnableScheduledFuture<?>)e;
375 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
376 t.isCancelled()) { // also remove if already cancelled
387 * Modifies or replaces the task used to execute a runnable.
388 * This method can be used to override the concrete
389 * class used for managing internal tasks.
390 * The default implementation simply returns the given task.
392 * @param runnable the submitted Runnable
393 * @param task the task created to execute the runnable
394 * @return a task that can execute the runnable
397 protected <V> RunnableScheduledFuture<V> decorateTask(
398 Runnable runnable, RunnableScheduledFuture<V> task) {
403 * Modifies or replaces the task used to execute a callable.
404 * This method can be used to override the concrete
405 * class used for managing internal tasks.
406 * The default implementation simply returns the given task.
408 * @param callable the submitted Callable
409 * @param task the task created to execute the callable
410 * @return a task that can execute the callable
413 protected <V> RunnableScheduledFuture<V> decorateTask(
414 Callable<V> callable, RunnableScheduledFuture<V> task) {
419 * Creates a new {@code ScheduledThreadPoolExecutor} with the
420 * given core pool size.
422 * @param corePoolSize the number of threads to keep in the pool, even
423 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
424 * @throws IllegalArgumentException if {@code corePoolSize < 0}
426 public ScheduledThreadPoolExecutor(int corePoolSize) {
427 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
428 new DelayedWorkQueue());
432 * Creates a new {@code ScheduledThreadPoolExecutor} with the
433 * given initial parameters.
435 * @param corePoolSize the number of threads to keep in the pool, even
436 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
437 * @param threadFactory the factory to use when the executor
438 * creates a new thread
439 * @throws IllegalArgumentException if {@code corePoolSize < 0}
440 * @throws NullPointerException if {@code threadFactory} is null
442 public ScheduledThreadPoolExecutor(int corePoolSize,
443 ThreadFactory threadFactory) {
444 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
445 new DelayedWorkQueue(), threadFactory);
449 * Creates a new ScheduledThreadPoolExecutor with the given
450 * initial parameters.
452 * @param corePoolSize the number of threads to keep in the pool, even
453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 * @param handler the handler to use when execution is blocked
455 * because the thread bounds and queue capacities are reached
456 * @throws IllegalArgumentException if {@code corePoolSize < 0}
457 * @throws NullPointerException if {@code handler} is null
459 public ScheduledThreadPoolExecutor(int corePoolSize,
460 RejectedExecutionHandler handler) {
461 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
462 new DelayedWorkQueue(), handler);
466 * Creates a new ScheduledThreadPoolExecutor with the given
467 * initial parameters.
469 * @param corePoolSize the number of threads to keep in the pool, even
470 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
471 * @param threadFactory the factory to use when the executor
472 * creates a new thread
473 * @param handler the handler to use when execution is blocked
474 * because the thread bounds and queue capacities are reached
475 * @throws IllegalArgumentException if {@code corePoolSize < 0}
476 * @throws NullPointerException if {@code threadFactory} or
477 * {@code handler} is null
479 public ScheduledThreadPoolExecutor(int corePoolSize,
480 ThreadFactory threadFactory,
481 RejectedExecutionHandler handler) {
482 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
483 new DelayedWorkQueue(), threadFactory, handler);
487 * Returns the trigger time of a delayed action.
489 private long triggerTime(long delay, TimeUnit unit) {
490 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
494 * Returns the trigger time of a delayed action.
496 long triggerTime(long delay) {
498 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
502 * Constrains the values of all delays in the queue to be within
503 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
504 * This may occur if a task is eligible to be dequeued, but has
505 * not yet been, while some other task is added with a delay of
508 private long overflowFree(long delay) {
509 Delayed head = (Delayed) super.getQueue().peek();
511 long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
512 if (headDelay < 0 && (delay - headDelay < 0))
513 delay = Long.MAX_VALUE + headDelay;
519 * @throws RejectedExecutionException {@inheritDoc}
520 * @throws NullPointerException {@inheritDoc}
522 public ScheduledFuture<?> schedule(Runnable command,
525 if (command == null || unit == null)
526 throw new NullPointerException();
527 RunnableScheduledFuture<?> t = decorateTask(command,
528 new ScheduledFutureTask<Void>(command, null,
529 triggerTime(delay, unit)));
535 * @throws RejectedExecutionException {@inheritDoc}
536 * @throws NullPointerException {@inheritDoc}
538 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
541 if (callable == null || unit == null)
542 throw new NullPointerException();
543 RunnableScheduledFuture<V> t = decorateTask(callable,
544 new ScheduledFutureTask<V>(callable,
545 triggerTime(delay, unit)));
551 * @throws RejectedExecutionException {@inheritDoc}
552 * @throws NullPointerException {@inheritDoc}
553 * @throws IllegalArgumentException {@inheritDoc}
555 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
559 if (command == null || unit == null)
560 throw new NullPointerException();
562 throw new IllegalArgumentException();
563 ScheduledFutureTask<Void> sft =
564 new ScheduledFutureTask<Void>(command,
566 triggerTime(initialDelay, unit),
567 unit.toNanos(period));
568 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
575 * @throws RejectedExecutionException {@inheritDoc}
576 * @throws NullPointerException {@inheritDoc}
577 * @throws IllegalArgumentException {@inheritDoc}
579 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
583 if (command == null || unit == null)
584 throw new NullPointerException();
586 throw new IllegalArgumentException();
587 ScheduledFutureTask<Void> sft =
588 new ScheduledFutureTask<Void>(command,
590 triggerTime(initialDelay, unit),
591 unit.toNanos(-delay));
592 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
599 * Executes {@code command} with zero required delay.
600 * This has effect equivalent to
601 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
602 * Note that inspections of the queue and of the list returned by
603 * {@code shutdownNow} will access the zero-delayed
604 * {@link ScheduledFuture}, not the {@code command} itself.
606 * <p>A consequence of the use of {@code ScheduledFuture} objects is
607 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
608 * called with a null second {@code Throwable} argument, even if the
609 * {@code command} terminated abruptly. Instead, the {@code Throwable}
610 * thrown by such a task can be obtained via {@link Future#get}.
612 * @throws RejectedExecutionException at discretion of
613 * {@code RejectedExecutionHandler}, if the task
614 * cannot be accepted for execution because the
615 * executor has been shut down
616 * @throws NullPointerException {@inheritDoc}
618 public void execute(Runnable command) {
619 schedule(command, 0, TimeUnit.NANOSECONDS);
622 // Override AbstractExecutorService methods
625 * @throws RejectedExecutionException {@inheritDoc}
626 * @throws NullPointerException {@inheritDoc}
628 public Future<?> submit(Runnable task) {
629 return schedule(task, 0, TimeUnit.NANOSECONDS);
633 * @throws RejectedExecutionException {@inheritDoc}
634 * @throws NullPointerException {@inheritDoc}
636 public <T> Future<T> submit(Runnable task, T result) {
637 return schedule(Executors.callable(task, result),
638 0, TimeUnit.NANOSECONDS);
642 * @throws RejectedExecutionException {@inheritDoc}
643 * @throws NullPointerException {@inheritDoc}
645 public <T> Future<T> submit(Callable<T> task) {
646 return schedule(task, 0, TimeUnit.NANOSECONDS);
650 * Sets the policy on whether to continue executing existing
651 * periodic tasks even when this executor has been {@code shutdown}.
652 * In this case, these tasks will only terminate upon
653 * {@code shutdownNow} or after setting the policy to
654 * {@code false} when already shutdown.
655 * This value is by default {@code false}.
657 * @param value if {@code true}, continue after shutdown, else don't.
658 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
660 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
661 continueExistingPeriodicTasksAfterShutdown = value;
662 if (!value && isShutdown())
667 * Gets the policy on whether to continue executing existing
668 * periodic tasks even when this executor has been {@code shutdown}.
669 * In this case, these tasks will only terminate upon
670 * {@code shutdownNow} or after setting the policy to
671 * {@code false} when already shutdown.
672 * This value is by default {@code false}.
674 * @return {@code true} if will continue after shutdown
675 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
677 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
678 return continueExistingPeriodicTasksAfterShutdown;
682 * Sets the policy on whether to execute existing delayed
683 * tasks even when this executor has been {@code shutdown}.
684 * In this case, these tasks will only terminate upon
685 * {@code shutdownNow}, or after setting the policy to
686 * {@code false} when already shutdown.
687 * This value is by default {@code true}.
689 * @param value if {@code true}, execute after shutdown, else don't.
690 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
692 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
693 executeExistingDelayedTasksAfterShutdown = value;
694 if (!value && isShutdown())
699 * Gets the policy on whether to execute existing delayed
700 * tasks even when this executor has been {@code shutdown}.
701 * In this case, these tasks will only terminate upon
702 * {@code shutdownNow}, or after setting the policy to
703 * {@code false} when already shutdown.
704 * This value is by default {@code true}.
706 * @return {@code true} if will execute after shutdown
707 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
709 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
710 return executeExistingDelayedTasksAfterShutdown;
714 * Sets the policy on whether cancelled tasks should be immediately
715 * removed from the work queue at time of cancellation. This value is
716 * by default {@code false}.
718 * @param value if {@code true}, remove on cancellation, else don't
719 * @see #getRemoveOnCancelPolicy
722 public void setRemoveOnCancelPolicy(boolean value) {
723 removeOnCancel = value;
727 * Gets the policy on whether cancelled tasks should be immediately
728 * removed from the work queue at time of cancellation. This value is
729 * by default {@code false}.
731 * @return {@code true} if cancelled tasks are immediately removed
733 * @see #setRemoveOnCancelPolicy
736 public boolean getRemoveOnCancelPolicy() {
737 return removeOnCancel;
741 * Initiates an orderly shutdown in which previously submitted
742 * tasks are executed, but no new tasks will be accepted.
743 * Invocation has no additional effect if already shut down.
745 * <p>This method does not wait for previously submitted tasks to
746 * complete execution. Use {@link #awaitTermination awaitTermination}
749 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
750 * has been set {@code false}, existing delayed tasks whose delays
751 * have not yet elapsed are cancelled. And unless the {@code
752 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
753 * {@code true}, future executions of existing periodic tasks will
756 * @throws SecurityException {@inheritDoc}
758 public void shutdown() {
763 * Attempts to stop all actively executing tasks, halts the
764 * processing of waiting tasks, and returns a list of the tasks
765 * that were awaiting execution.
767 * <p>This method does not wait for actively executing tasks to
768 * terminate. Use {@link #awaitTermination awaitTermination} to
771 * <p>There are no guarantees beyond best-effort attempts to stop
772 * processing actively executing tasks. This implementation
773 * cancels tasks via {@link Thread#interrupt}, so any task that
774 * fails to respond to interrupts may never terminate.
776 * @return list of tasks that never commenced execution.
777 * Each element of this list is a {@link ScheduledFuture},
778 * including those tasks submitted using {@code execute},
779 * which are for scheduling purposes used as the basis of a
780 * zero-delay {@code ScheduledFuture}.
781 * @throws SecurityException {@inheritDoc}
783 public List<Runnable> shutdownNow() {
784 return super.shutdownNow();
788 * Returns the task queue used by this executor. Each element of
789 * this queue is a {@link ScheduledFuture}, including those
790 * tasks submitted using {@code execute} which are for scheduling
791 * purposes used as the basis of a zero-delay
792 * {@code ScheduledFuture}. Iteration over this queue is
793 * <em>not</em> guaranteed to traverse tasks in the order in
794 * which they will execute.
796 * @return the task queue
798 public BlockingQueue<Runnable> getQueue() {
799 return super.getQueue();
803 * Specialized delay queue. To mesh with TPE declarations, this
804 * class must be declared as a BlockingQueue<Runnable> even though
805 * it can only hold RunnableScheduledFutures.
807 static class DelayedWorkQueue extends AbstractQueue<Runnable>
808 implements BlockingQueue<Runnable> {
811 * A DelayedWorkQueue is based on a heap-based data structure
812 * like those in DelayQueue and PriorityQueue, except that
813 * every ScheduledFutureTask also records its index into the
814 * heap array. This eliminates the need to find a task upon
815 * cancellation, greatly speeding up removal (down from O(n)
816 * to O(log n)), and reducing garbage retention that would
817 * otherwise occur by waiting for the element to rise to top
818 * before clearing. But because the queue may also hold
819 * RunnableScheduledFutures that are not ScheduledFutureTasks,
820 * we are not guaranteed to have such indices available, in
821 * which case we fall back to linear search. (We expect that
822 * most tasks will not be decorated, and that the faster cases
823 * will be much more common.)
825 * All heap operations must record index changes -- mainly
826 * within siftUp and siftDown. Upon removal, a task's
827 * heapIndex is set to -1. Note that ScheduledFutureTasks can
828 * appear at most once in the queue (this need not be true for
829 * other kinds of tasks or work queues), so are uniquely
830 * identified by heapIndex.
833 private static final int INITIAL_CAPACITY = 16;
834 private RunnableScheduledFuture[] queue =
835 new RunnableScheduledFuture[INITIAL_CAPACITY];
836 private final ReentrantLock lock = new ReentrantLock();
837 private int size = 0;
840 * Thread designated to wait for the task at the head of the
841 * queue. This variant of the Leader-Follower pattern
842 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
843 * minimize unnecessary timed waiting. When a thread becomes
844 * the leader, it waits only for the next delay to elapse, but
845 * other threads await indefinitely. The leader thread must
846 * signal some other thread before returning from take() or
847 * poll(...), unless some other thread becomes leader in the
848 * interim. Whenever the head of the queue is replaced with a
849 * task with an earlier expiration time, the leader field is
850 * invalidated by being reset to null, and some waiting
851 * thread, but not necessarily the current leader, is
852 * signalled. So waiting threads must be prepared to acquire
853 * and lose leadership while waiting.
855 private Thread leader = null;
858 * Condition signalled when a newer task becomes available at the
859 * head of the queue or a new thread may need to become leader.
861 private final Condition available = lock.newCondition();
864 * Set f's heapIndex if it is a ScheduledFutureTask.
866 private void setIndex(RunnableScheduledFuture f, int idx) {
867 if (f instanceof ScheduledFutureTask)
868 ((ScheduledFutureTask)f).heapIndex = idx;
872 * Sift element added at bottom up to its heap-ordered spot.
873 * Call only when holding lock.
875 private void siftUp(int k, RunnableScheduledFuture key) {
877 int parent = (k - 1) >>> 1;
878 RunnableScheduledFuture e = queue[parent];
879 if (key.compareTo(e) >= 0)
890 * Sift element added at top down to its heap-ordered spot.
891 * Call only when holding lock.
893 private void siftDown(int k, RunnableScheduledFuture key) {
894 int half = size >>> 1;
896 int child = (k << 1) + 1;
897 RunnableScheduledFuture c = queue[child];
898 int right = child + 1;
899 if (right < size && c.compareTo(queue[right]) > 0)
900 c = queue[child = right];
901 if (key.compareTo(c) <= 0)
912 * Resize the heap array. Call only when holding lock.
914 private void grow() {
915 int oldCapacity = queue.length;
916 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
917 if (newCapacity < 0) // overflow
918 newCapacity = Integer.MAX_VALUE;
919 queue = Arrays.copyOf(queue, newCapacity);
923 * Find index of given object, or -1 if absent
925 private int indexOf(Object x) {
927 if (x instanceof ScheduledFutureTask) {
928 int i = ((ScheduledFutureTask) x).heapIndex;
929 // Sanity check; x could conceivably be a
930 // ScheduledFutureTask from some other pool.
931 if (i >= 0 && i < size && queue[i] == x)
934 for (int i = 0; i < size; i++)
935 if (x.equals(queue[i]))
942 public boolean contains(Object x) {
943 final ReentrantLock lock = this.lock;
946 return indexOf(x) != -1;
952 public boolean remove(Object x) {
953 final ReentrantLock lock = this.lock;
960 setIndex(queue[i], -1);
962 RunnableScheduledFuture replacement = queue[s];
965 siftDown(i, replacement);
966 if (queue[i] == replacement)
967 siftUp(i, replacement);
976 final ReentrantLock lock = this.lock;
985 public boolean isEmpty() {
989 public int remainingCapacity() {
990 return Integer.MAX_VALUE;
993 public RunnableScheduledFuture peek() {
994 final ReentrantLock lock = this.lock;
1003 public boolean offer(Runnable x) {
1005 throw new NullPointerException();
1006 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1007 final ReentrantLock lock = this.lock;
1011 if (i >= queue.length)
1020 if (queue[0] == e) {
1030 public void put(Runnable e) {
1034 public boolean add(Runnable e) {
1038 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1043 * Performs common bookkeeping for poll and take: Replaces
1044 * first element with last and sifts it down. Call only when
1046 * @param f the task to remove and return
1048 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1050 RunnableScheduledFuture x = queue[s];
1058 public RunnableScheduledFuture poll() {
1059 final ReentrantLock lock = this.lock;
1062 RunnableScheduledFuture first = queue[0];
1063 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1066 return finishPoll(first);
1072 public RunnableScheduledFuture take() throws InterruptedException {
1073 final ReentrantLock lock = this.lock;
1074 lock.lockInterruptibly();
1077 RunnableScheduledFuture first = queue[0];
1081 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1083 return finishPoll(first);
1084 else if (leader != null)
1087 Thread thisThread = Thread.currentThread();
1088 leader = thisThread;
1090 available.awaitNanos(delay);
1092 if (leader == thisThread)
1099 if (leader == null && queue[0] != null)
1105 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1106 throws InterruptedException {
1107 long nanos = unit.toNanos(timeout);
1108 final ReentrantLock lock = this.lock;
1109 lock.lockInterruptibly();
1112 RunnableScheduledFuture first = queue[0];
1113 if (first == null) {
1117 nanos = available.awaitNanos(nanos);
1119 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1121 return finishPoll(first);
1124 if (nanos < delay || leader != null)
1125 nanos = available.awaitNanos(nanos);
1127 Thread thisThread = Thread.currentThread();
1128 leader = thisThread;
1130 long timeLeft = available.awaitNanos(delay);
1131 nanos -= delay - timeLeft;
1133 if (leader == thisThread)
1140 if (leader == null && queue[0] != null)
1146 public void clear() {
1147 final ReentrantLock lock = this.lock;
1150 for (int i = 0; i < size; i++) {
1151 RunnableScheduledFuture t = queue[i];
1164 * Return and remove first element only if it is expired.
1165 * Used only by drainTo. Call only when holding lock.
1167 private RunnableScheduledFuture pollExpired() {
1168 RunnableScheduledFuture first = queue[0];
1169 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1171 return finishPoll(first);
1174 public int drainTo(Collection<? super Runnable> c) {
1176 throw new NullPointerException();
1178 throw new IllegalArgumentException();
1179 final ReentrantLock lock = this.lock;
1182 RunnableScheduledFuture first;
1184 while ((first = pollExpired()) != null) {
1194 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1196 throw new NullPointerException();
1198 throw new IllegalArgumentException();
1199 if (maxElements <= 0)
1201 final ReentrantLock lock = this.lock;
1204 RunnableScheduledFuture first;
1206 while (n < maxElements && (first = pollExpired()) != null) {
1216 public Object[] toArray() {
1217 final ReentrantLock lock = this.lock;
1220 return Arrays.copyOf(queue, size, Object[].class);
1226 @SuppressWarnings("unchecked")
1227 public <T> T[] toArray(T[] a) {
1228 final ReentrantLock lock = this.lock;
1231 if (a.length < size)
1232 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1233 System.arraycopy(queue, 0, a, 0, size);
1234 if (a.length > size)
1242 public Iterator<Runnable> iterator() {
1243 return new Itr(Arrays.copyOf(queue, size));
1247 * Snapshot iterator that works off copy of underlying q array.
1249 private class Itr implements Iterator<Runnable> {
1250 final RunnableScheduledFuture[] array;
1251 int cursor = 0; // index of next element to return
1252 int lastRet = -1; // index of last element, or -1 if no such
1254 Itr(RunnableScheduledFuture[] array) {
1258 public boolean hasNext() {
1259 return cursor < array.length;
1262 public Runnable next() {
1263 if (cursor >= array.length)
1264 throw new NoSuchElementException();
1266 return array[cursor++];
1269 public void remove() {
1271 throw new IllegalStateException();
1272 DelayedWorkQueue.this.remove(array[lastRet]);