rt/emul/compact/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,1277 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +import java.util.concurrent.atomic.*;
1.41 +import java.util.concurrent.locks.*;
1.42 +import java.util.*;
1.43 +
1.44 +/**
1.45 + * A {@link ThreadPoolExecutor} that can additionally schedule
1.46 + * commands to run after a given delay, or to execute
1.47 + * periodically. This class is preferable to {@link java.util.Timer}
1.48 + * when multiple worker threads are needed, or when the additional
1.49 + * flexibility or capabilities of {@link ThreadPoolExecutor} (which
1.50 + * this class extends) are required.
1.51 + *
1.52 + * <p>Delayed tasks execute no sooner than they are enabled, but
1.53 + * without any real-time guarantees about when, after they are
1.54 + * enabled, they will commence. Tasks scheduled for exactly the same
1.55 + * execution time are enabled in first-in-first-out (FIFO) order of
1.56 + * submission.
1.57 + *
1.58 + * <p>When a submitted task is cancelled before it is run, execution
1.59 + * is suppressed. By default, such a cancelled task is not
1.60 + * automatically removed from the work queue until its delay
1.61 + * elapses. While this enables further inspection and monitoring, it
1.62 + * may also cause unbounded retention of cancelled tasks. To avoid
1.63 + * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
1.64 + * causes tasks to be immediately removed from the work queue at
1.65 + * time of cancellation.
1.66 + *
1.67 + * <p>Successive executions of a task scheduled via
1.68 + * {@code scheduleAtFixedRate} or
1.69 + * {@code scheduleWithFixedDelay} do not overlap. While different
1.70 + * executions may be performed by different threads, the effects of
1.71 + * prior executions <a
1.72 + * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
1.73 + * those of subsequent ones.
1.74 + *
1.75 + * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
1.76 + * of the inherited tuning methods are not useful for it. In
1.77 + * particular, because it acts as a fixed-sized pool using
1.78 + * {@code corePoolSize} threads and an unbounded queue, adjustments
1.79 + * to {@code maximumPoolSize} have no useful effect. Additionally, it
1.80 + * is almost never a good idea to set {@code corePoolSize} to zero or
1.81 + * use {@code allowCoreThreadTimeOut} because this may leave the pool
1.82 + * without threads to handle tasks once they become eligible to run.
1.83 + *
1.84 + * <p><b>Extension notes:</b> This class overrides the
1.85 + * {@link ThreadPoolExecutor#execute execute} and
1.86 + * {@link AbstractExecutorService#submit(Runnable) submit}
1.87 + * methods to generate internal {@link ScheduledFuture} objects to
1.88 + * control per-task delays and scheduling. To preserve
1.89 + * functionality, any further overrides of these methods in
1.90 + * subclasses must invoke superclass versions, which effectively
1.91 + * disables additional task customization. However, this class
1.92 + * provides alternative protected extension method
1.93 + * {@code decorateTask} (one version each for {@code Runnable} and
1.94 + * {@code Callable}) that can be used to customize the concrete task
1.95 + * types used to execute commands entered via {@code execute},
1.96 + * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
1.97 + * and {@code scheduleWithFixedDelay}. By default, a
1.98 + * {@code ScheduledThreadPoolExecutor} uses a task type extending
1.99 + * {@link FutureTask}. However, this may be modified or replaced using
1.100 + * subclasses of the form:
1.101 + *
1.102 + * <pre> {@code
1.103 + * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
1.104 + *
1.105 + * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
1.106 + *
1.107 + * protected <V> RunnableScheduledFuture<V> decorateTask(
1.108 + * Runnable r, RunnableScheduledFuture<V> task) {
1.109 + * return new CustomTask<V>(r, task);
1.110 + * }
1.111 + *
1.112 + * protected <V> RunnableScheduledFuture<V> decorateTask(
1.113 + * Callable<V> c, RunnableScheduledFuture<V> task) {
1.114 + * return new CustomTask<V>(c, task);
1.115 + * }
1.116 + * // ... add constructors, etc.
1.117 + * }}</pre>
1.118 + *
1.119 + * @since 1.5
1.120 + * @author Doug Lea
1.121 + */
1.122 +public class ScheduledThreadPoolExecutor
1.123 + extends ThreadPoolExecutor
1.124 + implements ScheduledExecutorService {
1.125 +
1.126 + /*
1.127 + * This class specializes ThreadPoolExecutor implementation by
1.128 + *
1.129 + * 1. Using a custom task type, ScheduledFutureTask for
1.130 + * tasks, even those that don't require scheduling (i.e.,
1.131 + * those submitted using ExecutorService execute, not
1.132 + * ScheduledExecutorService methods) which are treated as
1.133 + * delayed tasks with a delay of zero.
1.134 + *
1.135 + * 2. Using a custom queue (DelayedWorkQueue), a variant of
1.136 + * unbounded DelayQueue. The lack of capacity constraint and
1.137 + * the fact that corePoolSize and maximumPoolSize are
1.138 + * effectively identical simplifies some execution mechanics
1.139 + * (see delayedExecute) compared to ThreadPoolExecutor.
1.140 + *
1.141 + * 3. Supporting optional run-after-shutdown parameters, which
1.142 + * leads to overrides of shutdown methods to remove and cancel
1.143 + * tasks that should NOT be run after shutdown, as well as
1.144 + * different recheck logic when task (re)submission overlaps
1.145 + * with a shutdown.
1.146 + *
1.147 + * 4. Task decoration methods to allow interception and
1.148 + * instrumentation, which are needed because subclasses cannot
1.149 + * otherwise override submit methods to get this effect. These
1.150 + * don't have any impact on pool control logic though.
1.151 + */
1.152 +
1.153 + /**
1.154 + * False if should cancel/suppress periodic tasks on shutdown.
1.155 + */
1.156 + private volatile boolean continueExistingPeriodicTasksAfterShutdown;
1.157 +
1.158 + /**
1.159 + * False if should cancel non-periodic tasks on shutdown.
1.160 + */
1.161 + private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
1.162 +
1.163 + /**
1.164 + * True if ScheduledFutureTask.cancel should remove from queue
1.165 + */
1.166 + private volatile boolean removeOnCancel = false;
1.167 +
1.168 + /**
1.169 + * Sequence number to break scheduling ties, and in turn to
1.170 + * guarantee FIFO order among tied entries.
1.171 + */
1.172 + private static final AtomicLong sequencer = new AtomicLong(0);
1.173 +
1.174 + /**
1.175 + * Returns current nanosecond time.
1.176 + */
1.177 + final long now() {
1.178 + return System.nanoTime();
1.179 + }
1.180 +
1.181 + private class ScheduledFutureTask<V>
1.182 + extends FutureTask<V> implements RunnableScheduledFuture<V> {
1.183 +
1.184 + /** Sequence number to break ties FIFO */
1.185 + private final long sequenceNumber;
1.186 +
1.187 + /** The time the task is enabled to execute in nanoTime units */
1.188 + private long time;
1.189 +
1.190 + /**
1.191 + * Period in nanoseconds for repeating tasks. A positive
1.192 + * value indicates fixed-rate execution. A negative value
1.193 + * indicates fixed-delay execution. A value of 0 indicates a
1.194 + * non-repeating task.
1.195 + */
1.196 + private final long period;
1.197 +
1.198 + /** The actual task to be re-enqueued by reExecutePeriodic */
1.199 + RunnableScheduledFuture<V> outerTask = this;
1.200 +
1.201 + /**
1.202 + * Index into delay queue, to support faster cancellation.
1.203 + */
1.204 + int heapIndex;
1.205 +
1.206 + /**
1.207 + * Creates a one-shot action with given nanoTime-based trigger time.
1.208 + */
1.209 + ScheduledFutureTask(Runnable r, V result, long ns) {
1.210 + super(r, result);
1.211 + this.time = ns;
1.212 + this.period = 0;
1.213 + this.sequenceNumber = sequencer.getAndIncrement();
1.214 + }
1.215 +
1.216 + /**
1.217 + * Creates a periodic action with given nano time and period.
1.218 + */
1.219 + ScheduledFutureTask(Runnable r, V result, long ns, long period) {
1.220 + super(r, result);
1.221 + this.time = ns;
1.222 + this.period = period;
1.223 + this.sequenceNumber = sequencer.getAndIncrement();
1.224 + }
1.225 +
1.226 + /**
1.227 + * Creates a one-shot action with given nanoTime-based trigger.
1.228 + */
1.229 + ScheduledFutureTask(Callable<V> callable, long ns) {
1.230 + super(callable);
1.231 + this.time = ns;
1.232 + this.period = 0;
1.233 + this.sequenceNumber = sequencer.getAndIncrement();
1.234 + }
1.235 +
1.236 + public long getDelay(TimeUnit unit) {
1.237 + return unit.convert(time - now(), TimeUnit.NANOSECONDS);
1.238 + }
1.239 +
1.240 + public int compareTo(Delayed other) {
1.241 + if (other == this) // compare zero ONLY if same object
1.242 + return 0;
1.243 + if (other instanceof ScheduledFutureTask) {
1.244 + ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
1.245 + long diff = time - x.time;
1.246 + if (diff < 0)
1.247 + return -1;
1.248 + else if (diff > 0)
1.249 + return 1;
1.250 + else if (sequenceNumber < x.sequenceNumber)
1.251 + return -1;
1.252 + else
1.253 + return 1;
1.254 + }
1.255 + long d = (getDelay(TimeUnit.NANOSECONDS) -
1.256 + other.getDelay(TimeUnit.NANOSECONDS));
1.257 + return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
1.258 + }
1.259 +
1.260 + /**
1.261 + * Returns true if this is a periodic (not a one-shot) action.
1.262 + *
1.263 + * @return true if periodic
1.264 + */
1.265 + public boolean isPeriodic() {
1.266 + return period != 0;
1.267 + }
1.268 +
1.269 + /**
1.270 + * Sets the next time to run for a periodic task.
1.271 + */
1.272 + private void setNextRunTime() {
1.273 + long p = period;
1.274 + if (p > 0)
1.275 + time += p;
1.276 + else
1.277 + time = triggerTime(-p);
1.278 + }
1.279 +
1.280 + public boolean cancel(boolean mayInterruptIfRunning) {
1.281 + boolean cancelled = super.cancel(mayInterruptIfRunning);
1.282 + if (cancelled && removeOnCancel && heapIndex >= 0)
1.283 + remove(this);
1.284 + return cancelled;
1.285 + }
1.286 +
1.287 + /**
1.288 + * Overrides FutureTask version so as to reset/requeue if periodic.
1.289 + */
1.290 + public void run() {
1.291 + boolean periodic = isPeriodic();
1.292 + if (!canRunInCurrentRunState(periodic))
1.293 + cancel(false);
1.294 + else if (!periodic)
1.295 + ScheduledFutureTask.super.run();
1.296 + else if (ScheduledFutureTask.super.runAndReset()) {
1.297 + setNextRunTime();
1.298 + reExecutePeriodic(outerTask);
1.299 + }
1.300 + }
1.301 + }
1.302 +
1.303 + /**
1.304 + * Returns true if can run a task given current run state
1.305 + * and run-after-shutdown parameters.
1.306 + *
1.307 + * @param periodic true if this task periodic, false if delayed
1.308 + */
1.309 + boolean canRunInCurrentRunState(boolean periodic) {
1.310 + return isRunningOrShutdown(periodic ?
1.311 + continueExistingPeriodicTasksAfterShutdown :
1.312 + executeExistingDelayedTasksAfterShutdown);
1.313 + }
1.314 +
1.315 + /**
1.316 + * Main execution method for delayed or periodic tasks. If pool
1.317 + * is shut down, rejects the task. Otherwise adds task to queue
1.318 + * and starts a thread, if necessary, to run it. (We cannot
1.319 + * prestart the thread to run the task because the task (probably)
1.320 + * shouldn't be run yet,) If the pool is shut down while the task
1.321 + * is being added, cancel and remove it if required by state and
1.322 + * run-after-shutdown parameters.
1.323 + *
1.324 + * @param task the task
1.325 + */
1.326 + private void delayedExecute(RunnableScheduledFuture<?> task) {
1.327 + if (isShutdown())
1.328 + reject(task);
1.329 + else {
1.330 + super.getQueue().add(task);
1.331 + if (isShutdown() &&
1.332 + !canRunInCurrentRunState(task.isPeriodic()) &&
1.333 + remove(task))
1.334 + task.cancel(false);
1.335 + else
1.336 + prestartCoreThread();
1.337 + }
1.338 + }
1.339 +
1.340 + /**
1.341 + * Requeues a periodic task unless current run state precludes it.
1.342 + * Same idea as delayedExecute except drops task rather than rejecting.
1.343 + *
1.344 + * @param task the task
1.345 + */
1.346 + void reExecutePeriodic(RunnableScheduledFuture<?> task) {
1.347 + if (canRunInCurrentRunState(true)) {
1.348 + super.getQueue().add(task);
1.349 + if (!canRunInCurrentRunState(true) && remove(task))
1.350 + task.cancel(false);
1.351 + else
1.352 + prestartCoreThread();
1.353 + }
1.354 + }
1.355 +
1.356 + /**
1.357 + * Cancels and clears the queue of all tasks that should not be run
1.358 + * due to shutdown policy. Invoked within super.shutdown.
1.359 + */
1.360 + @Override void onShutdown() {
1.361 + BlockingQueue<Runnable> q = super.getQueue();
1.362 + boolean keepDelayed =
1.363 + getExecuteExistingDelayedTasksAfterShutdownPolicy();
1.364 + boolean keepPeriodic =
1.365 + getContinueExistingPeriodicTasksAfterShutdownPolicy();
1.366 + if (!keepDelayed && !keepPeriodic) {
1.367 + for (Object e : q.toArray())
1.368 + if (e instanceof RunnableScheduledFuture<?>)
1.369 + ((RunnableScheduledFuture<?>) e).cancel(false);
1.370 + q.clear();
1.371 + }
1.372 + else {
1.373 + // Traverse snapshot to avoid iterator exceptions
1.374 + for (Object e : q.toArray()) {
1.375 + if (e instanceof RunnableScheduledFuture) {
1.376 + RunnableScheduledFuture<?> t =
1.377 + (RunnableScheduledFuture<?>)e;
1.378 + if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
1.379 + t.isCancelled()) { // also remove if already cancelled
1.380 + if (q.remove(t))
1.381 + t.cancel(false);
1.382 + }
1.383 + }
1.384 + }
1.385 + }
1.386 + tryTerminate();
1.387 + }
1.388 +
1.389 + /**
1.390 + * Modifies or replaces the task used to execute a runnable.
1.391 + * This method can be used to override the concrete
1.392 + * class used for managing internal tasks.
1.393 + * The default implementation simply returns the given task.
1.394 + *
1.395 + * @param runnable the submitted Runnable
1.396 + * @param task the task created to execute the runnable
1.397 + * @return a task that can execute the runnable
1.398 + * @since 1.6
1.399 + */
1.400 + protected <V> RunnableScheduledFuture<V> decorateTask(
1.401 + Runnable runnable, RunnableScheduledFuture<V> task) {
1.402 + return task;
1.403 + }
1.404 +
1.405 + /**
1.406 + * Modifies or replaces the task used to execute a callable.
1.407 + * This method can be used to override the concrete
1.408 + * class used for managing internal tasks.
1.409 + * The default implementation simply returns the given task.
1.410 + *
1.411 + * @param callable the submitted Callable
1.412 + * @param task the task created to execute the callable
1.413 + * @return a task that can execute the callable
1.414 + * @since 1.6
1.415 + */
1.416 + protected <V> RunnableScheduledFuture<V> decorateTask(
1.417 + Callable<V> callable, RunnableScheduledFuture<V> task) {
1.418 + return task;
1.419 + }
1.420 +
1.421 + /**
1.422 + * Creates a new {@code ScheduledThreadPoolExecutor} with the
1.423 + * given core pool size.
1.424 + *
1.425 + * @param corePoolSize the number of threads to keep in the pool, even
1.426 + * if they are idle, unless {@code allowCoreThreadTimeOut} is set
1.427 + * @throws IllegalArgumentException if {@code corePoolSize < 0}
1.428 + */
1.429 + public ScheduledThreadPoolExecutor(int corePoolSize) {
1.430 + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
1.431 + new DelayedWorkQueue());
1.432 + }
1.433 +
1.434 + /**
1.435 + * Creates a new {@code ScheduledThreadPoolExecutor} with the
1.436 + * given initial parameters.
1.437 + *
1.438 + * @param corePoolSize the number of threads to keep in the pool, even
1.439 + * if they are idle, unless {@code allowCoreThreadTimeOut} is set
1.440 + * @param threadFactory the factory to use when the executor
1.441 + * creates a new thread
1.442 + * @throws IllegalArgumentException if {@code corePoolSize < 0}
1.443 + * @throws NullPointerException if {@code threadFactory} is null
1.444 + */
1.445 + public ScheduledThreadPoolExecutor(int corePoolSize,
1.446 + ThreadFactory threadFactory) {
1.447 + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
1.448 + new DelayedWorkQueue(), threadFactory);
1.449 + }
1.450 +
1.451 + /**
1.452 + * Creates a new ScheduledThreadPoolExecutor with the given
1.453 + * initial parameters.
1.454 + *
1.455 + * @param corePoolSize the number of threads to keep in the pool, even
1.456 + * if they are idle, unless {@code allowCoreThreadTimeOut} is set
1.457 + * @param handler the handler to use when execution is blocked
1.458 + * because the thread bounds and queue capacities are reached
1.459 + * @throws IllegalArgumentException if {@code corePoolSize < 0}
1.460 + * @throws NullPointerException if {@code handler} is null
1.461 + */
1.462 + public ScheduledThreadPoolExecutor(int corePoolSize,
1.463 + RejectedExecutionHandler handler) {
1.464 + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
1.465 + new DelayedWorkQueue(), handler);
1.466 + }
1.467 +
1.468 + /**
1.469 + * Creates a new ScheduledThreadPoolExecutor with the given
1.470 + * initial parameters.
1.471 + *
1.472 + * @param corePoolSize the number of threads to keep in the pool, even
1.473 + * if they are idle, unless {@code allowCoreThreadTimeOut} is set
1.474 + * @param threadFactory the factory to use when the executor
1.475 + * creates a new thread
1.476 + * @param handler the handler to use when execution is blocked
1.477 + * because the thread bounds and queue capacities are reached
1.478 + * @throws IllegalArgumentException if {@code corePoolSize < 0}
1.479 + * @throws NullPointerException if {@code threadFactory} or
1.480 + * {@code handler} is null
1.481 + */
1.482 + public ScheduledThreadPoolExecutor(int corePoolSize,
1.483 + ThreadFactory threadFactory,
1.484 + RejectedExecutionHandler handler) {
1.485 + super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
1.486 + new DelayedWorkQueue(), threadFactory, handler);
1.487 + }
1.488 +
1.489 + /**
1.490 + * Returns the trigger time of a delayed action.
1.491 + */
1.492 + private long triggerTime(long delay, TimeUnit unit) {
1.493 + return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
1.494 + }
1.495 +
1.496 + /**
1.497 + * Returns the trigger time of a delayed action.
1.498 + */
1.499 + long triggerTime(long delay) {
1.500 + return now() +
1.501 + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
1.502 + }
1.503 +
1.504 + /**
1.505 + * Constrains the values of all delays in the queue to be within
1.506 + * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
1.507 + * This may occur if a task is eligible to be dequeued, but has
1.508 + * not yet been, while some other task is added with a delay of
1.509 + * Long.MAX_VALUE.
1.510 + */
1.511 + private long overflowFree(long delay) {
1.512 + Delayed head = (Delayed) super.getQueue().peek();
1.513 + if (head != null) {
1.514 + long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
1.515 + if (headDelay < 0 && (delay - headDelay < 0))
1.516 + delay = Long.MAX_VALUE + headDelay;
1.517 + }
1.518 + return delay;
1.519 + }
1.520 +
1.521 + /**
1.522 + * @throws RejectedExecutionException {@inheritDoc}
1.523 + * @throws NullPointerException {@inheritDoc}
1.524 + */
1.525 + public ScheduledFuture<?> schedule(Runnable command,
1.526 + long delay,
1.527 + TimeUnit unit) {
1.528 + if (command == null || unit == null)
1.529 + throw new NullPointerException();
1.530 + RunnableScheduledFuture<?> t = decorateTask(command,
1.531 + new ScheduledFutureTask<Void>(command, null,
1.532 + triggerTime(delay, unit)));
1.533 + delayedExecute(t);
1.534 + return t;
1.535 + }
1.536 +
1.537 + /**
1.538 + * @throws RejectedExecutionException {@inheritDoc}
1.539 + * @throws NullPointerException {@inheritDoc}
1.540 + */
1.541 + public <V> ScheduledFuture<V> schedule(Callable<V> callable,
1.542 + long delay,
1.543 + TimeUnit unit) {
1.544 + if (callable == null || unit == null)
1.545 + throw new NullPointerException();
1.546 + RunnableScheduledFuture<V> t = decorateTask(callable,
1.547 + new ScheduledFutureTask<V>(callable,
1.548 + triggerTime(delay, unit)));
1.549 + delayedExecute(t);
1.550 + return t;
1.551 + }
1.552 +
1.553 + /**
1.554 + * @throws RejectedExecutionException {@inheritDoc}
1.555 + * @throws NullPointerException {@inheritDoc}
1.556 + * @throws IllegalArgumentException {@inheritDoc}
1.557 + */
1.558 + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
1.559 + long initialDelay,
1.560 + long period,
1.561 + TimeUnit unit) {
1.562 + if (command == null || unit == null)
1.563 + throw new NullPointerException();
1.564 + if (period <= 0)
1.565 + throw new IllegalArgumentException();
1.566 + ScheduledFutureTask<Void> sft =
1.567 + new ScheduledFutureTask<Void>(command,
1.568 + null,
1.569 + triggerTime(initialDelay, unit),
1.570 + unit.toNanos(period));
1.571 + RunnableScheduledFuture<Void> t = decorateTask(command, sft);
1.572 + sft.outerTask = t;
1.573 + delayedExecute(t);
1.574 + return t;
1.575 + }
1.576 +
1.577 + /**
1.578 + * @throws RejectedExecutionException {@inheritDoc}
1.579 + * @throws NullPointerException {@inheritDoc}
1.580 + * @throws IllegalArgumentException {@inheritDoc}
1.581 + */
1.582 + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
1.583 + long initialDelay,
1.584 + long delay,
1.585 + TimeUnit unit) {
1.586 + if (command == null || unit == null)
1.587 + throw new NullPointerException();
1.588 + if (delay <= 0)
1.589 + throw new IllegalArgumentException();
1.590 + ScheduledFutureTask<Void> sft =
1.591 + new ScheduledFutureTask<Void>(command,
1.592 + null,
1.593 + triggerTime(initialDelay, unit),
1.594 + unit.toNanos(-delay));
1.595 + RunnableScheduledFuture<Void> t = decorateTask(command, sft);
1.596 + sft.outerTask = t;
1.597 + delayedExecute(t);
1.598 + return t;
1.599 + }
1.600 +
1.601 + /**
1.602 + * Executes {@code command} with zero required delay.
1.603 + * This has effect equivalent to
1.604 + * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
1.605 + * Note that inspections of the queue and of the list returned by
1.606 + * {@code shutdownNow} will access the zero-delayed
1.607 + * {@link ScheduledFuture}, not the {@code command} itself.
1.608 + *
1.609 + * <p>A consequence of the use of {@code ScheduledFuture} objects is
1.610 + * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
1.611 + * called with a null second {@code Throwable} argument, even if the
1.612 + * {@code command} terminated abruptly. Instead, the {@code Throwable}
1.613 + * thrown by such a task can be obtained via {@link Future#get}.
1.614 + *
1.615 + * @throws RejectedExecutionException at discretion of
1.616 + * {@code RejectedExecutionHandler}, if the task
1.617 + * cannot be accepted for execution because the
1.618 + * executor has been shut down
1.619 + * @throws NullPointerException {@inheritDoc}
1.620 + */
1.621 + public void execute(Runnable command) {
1.622 + schedule(command, 0, TimeUnit.NANOSECONDS);
1.623 + }
1.624 +
1.625 + // Override AbstractExecutorService methods
1.626 +
1.627 + /**
1.628 + * @throws RejectedExecutionException {@inheritDoc}
1.629 + * @throws NullPointerException {@inheritDoc}
1.630 + */
1.631 + public Future<?> submit(Runnable task) {
1.632 + return schedule(task, 0, TimeUnit.NANOSECONDS);
1.633 + }
1.634 +
1.635 + /**
1.636 + * @throws RejectedExecutionException {@inheritDoc}
1.637 + * @throws NullPointerException {@inheritDoc}
1.638 + */
1.639 + public <T> Future<T> submit(Runnable task, T result) {
1.640 + return schedule(Executors.callable(task, result),
1.641 + 0, TimeUnit.NANOSECONDS);
1.642 + }
1.643 +
1.644 + /**
1.645 + * @throws RejectedExecutionException {@inheritDoc}
1.646 + * @throws NullPointerException {@inheritDoc}
1.647 + */
1.648 + public <T> Future<T> submit(Callable<T> task) {
1.649 + return schedule(task, 0, TimeUnit.NANOSECONDS);
1.650 + }
1.651 +
1.652 + /**
1.653 + * Sets the policy on whether to continue executing existing
1.654 + * periodic tasks even when this executor has been {@code shutdown}.
1.655 + * In this case, these tasks will only terminate upon
1.656 + * {@code shutdownNow} or after setting the policy to
1.657 + * {@code false} when already shutdown.
1.658 + * This value is by default {@code false}.
1.659 + *
1.660 + * @param value if {@code true}, continue after shutdown, else don't.
1.661 + * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
1.662 + */
1.663 + public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
1.664 + continueExistingPeriodicTasksAfterShutdown = value;
1.665 + if (!value && isShutdown())
1.666 + onShutdown();
1.667 + }
1.668 +
1.669 + /**
1.670 + * Gets the policy on whether to continue executing existing
1.671 + * periodic tasks even when this executor has been {@code shutdown}.
1.672 + * In this case, these tasks will only terminate upon
1.673 + * {@code shutdownNow} or after setting the policy to
1.674 + * {@code false} when already shutdown.
1.675 + * This value is by default {@code false}.
1.676 + *
1.677 + * @return {@code true} if will continue after shutdown
1.678 + * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
1.679 + */
1.680 + public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
1.681 + return continueExistingPeriodicTasksAfterShutdown;
1.682 + }
1.683 +
1.684 + /**
1.685 + * Sets the policy on whether to execute existing delayed
1.686 + * tasks even when this executor has been {@code shutdown}.
1.687 + * In this case, these tasks will only terminate upon
1.688 + * {@code shutdownNow}, or after setting the policy to
1.689 + * {@code false} when already shutdown.
1.690 + * This value is by default {@code true}.
1.691 + *
1.692 + * @param value if {@code true}, execute after shutdown, else don't.
1.693 + * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
1.694 + */
1.695 + public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
1.696 + executeExistingDelayedTasksAfterShutdown = value;
1.697 + if (!value && isShutdown())
1.698 + onShutdown();
1.699 + }
1.700 +
1.701 + /**
1.702 + * Gets the policy on whether to execute existing delayed
1.703 + * tasks even when this executor has been {@code shutdown}.
1.704 + * In this case, these tasks will only terminate upon
1.705 + * {@code shutdownNow}, or after setting the policy to
1.706 + * {@code false} when already shutdown.
1.707 + * This value is by default {@code true}.
1.708 + *
1.709 + * @return {@code true} if will execute after shutdown
1.710 + * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
1.711 + */
1.712 + public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
1.713 + return executeExistingDelayedTasksAfterShutdown;
1.714 + }
1.715 +
1.716 + /**
1.717 + * Sets the policy on whether cancelled tasks should be immediately
1.718 + * removed from the work queue at time of cancellation. This value is
1.719 + * by default {@code false}.
1.720 + *
1.721 + * @param value if {@code true}, remove on cancellation, else don't
1.722 + * @see #getRemoveOnCancelPolicy
1.723 + * @since 1.7
1.724 + */
1.725 + public void setRemoveOnCancelPolicy(boolean value) {
1.726 + removeOnCancel = value;
1.727 + }
1.728 +
1.729 + /**
1.730 + * Gets the policy on whether cancelled tasks should be immediately
1.731 + * removed from the work queue at time of cancellation. This value is
1.732 + * by default {@code false}.
1.733 + *
1.734 + * @return {@code true} if cancelled tasks are immediately removed
1.735 + * from the queue
1.736 + * @see #setRemoveOnCancelPolicy
1.737 + * @since 1.7
1.738 + */
1.739 + public boolean getRemoveOnCancelPolicy() {
1.740 + return removeOnCancel;
1.741 + }
1.742 +
1.743 + /**
1.744 + * Initiates an orderly shutdown in which previously submitted
1.745 + * tasks are executed, but no new tasks will be accepted.
1.746 + * Invocation has no additional effect if already shut down.
1.747 + *
1.748 + * <p>This method does not wait for previously submitted tasks to
1.749 + * complete execution. Use {@link #awaitTermination awaitTermination}
1.750 + * to do that.
1.751 + *
1.752 + * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
1.753 + * has been set {@code false}, existing delayed tasks whose delays
1.754 + * have not yet elapsed are cancelled. And unless the {@code
1.755 + * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
1.756 + * {@code true}, future executions of existing periodic tasks will
1.757 + * be cancelled.
1.758 + *
1.759 + * @throws SecurityException {@inheritDoc}
1.760 + */
1.761 + public void shutdown() {
1.762 + super.shutdown();
1.763 + }
1.764 +
1.765 + /**
1.766 + * Attempts to stop all actively executing tasks, halts the
1.767 + * processing of waiting tasks, and returns a list of the tasks
1.768 + * that were awaiting execution.
1.769 + *
1.770 + * <p>This method does not wait for actively executing tasks to
1.771 + * terminate. Use {@link #awaitTermination awaitTermination} to
1.772 + * do that.
1.773 + *
1.774 + * <p>There are no guarantees beyond best-effort attempts to stop
1.775 + * processing actively executing tasks. This implementation
1.776 + * cancels tasks via {@link Thread#interrupt}, so any task that
1.777 + * fails to respond to interrupts may never terminate.
1.778 + *
1.779 + * @return list of tasks that never commenced execution.
1.780 + * Each element of this list is a {@link ScheduledFuture},
1.781 + * including those tasks submitted using {@code execute},
1.782 + * which are for scheduling purposes used as the basis of a
1.783 + * zero-delay {@code ScheduledFuture}.
1.784 + * @throws SecurityException {@inheritDoc}
1.785 + */
1.786 + public List<Runnable> shutdownNow() {
1.787 + return super.shutdownNow();
1.788 + }
1.789 +
1.790 + /**
1.791 + * Returns the task queue used by this executor. Each element of
1.792 + * this queue is a {@link ScheduledFuture}, including those
1.793 + * tasks submitted using {@code execute} which are for scheduling
1.794 + * purposes used as the basis of a zero-delay
1.795 + * {@code ScheduledFuture}. Iteration over this queue is
1.796 + * <em>not</em> guaranteed to traverse tasks in the order in
1.797 + * which they will execute.
1.798 + *
1.799 + * @return the task queue
1.800 + */
1.801 + public BlockingQueue<Runnable> getQueue() {
1.802 + return super.getQueue();
1.803 + }
1.804 +
1.805 + /**
1.806 + * Specialized delay queue. To mesh with TPE declarations, this
1.807 + * class must be declared as a BlockingQueue<Runnable> even though
1.808 + * it can only hold RunnableScheduledFutures.
1.809 + */
1.810 + static class DelayedWorkQueue extends AbstractQueue<Runnable>
1.811 + implements BlockingQueue<Runnable> {
1.812 +
1.813 + /*
1.814 + * A DelayedWorkQueue is based on a heap-based data structure
1.815 + * like those in DelayQueue and PriorityQueue, except that
1.816 + * every ScheduledFutureTask also records its index into the
1.817 + * heap array. This eliminates the need to find a task upon
1.818 + * cancellation, greatly speeding up removal (down from O(n)
1.819 + * to O(log n)), and reducing garbage retention that would
1.820 + * otherwise occur by waiting for the element to rise to top
1.821 + * before clearing. But because the queue may also hold
1.822 + * RunnableScheduledFutures that are not ScheduledFutureTasks,
1.823 + * we are not guaranteed to have such indices available, in
1.824 + * which case we fall back to linear search. (We expect that
1.825 + * most tasks will not be decorated, and that the faster cases
1.826 + * will be much more common.)
1.827 + *
1.828 + * All heap operations must record index changes -- mainly
1.829 + * within siftUp and siftDown. Upon removal, a task's
1.830 + * heapIndex is set to -1. Note that ScheduledFutureTasks can
1.831 + * appear at most once in the queue (this need not be true for
1.832 + * other kinds of tasks or work queues), so are uniquely
1.833 + * identified by heapIndex.
1.834 + */
1.835 +
1.836 + private static final int INITIAL_CAPACITY = 16;
1.837 + private RunnableScheduledFuture[] queue =
1.838 + new RunnableScheduledFuture[INITIAL_CAPACITY];
1.839 + private final ReentrantLock lock = new ReentrantLock();
1.840 + private int size = 0;
1.841 +
1.842 + /**
1.843 + * Thread designated to wait for the task at the head of the
1.844 + * queue. This variant of the Leader-Follower pattern
1.845 + * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
1.846 + * minimize unnecessary timed waiting. When a thread becomes
1.847 + * the leader, it waits only for the next delay to elapse, but
1.848 + * other threads await indefinitely. The leader thread must
1.849 + * signal some other thread before returning from take() or
1.850 + * poll(...), unless some other thread becomes leader in the
1.851 + * interim. Whenever the head of the queue is replaced with a
1.852 + * task with an earlier expiration time, the leader field is
1.853 + * invalidated by being reset to null, and some waiting
1.854 + * thread, but not necessarily the current leader, is
1.855 + * signalled. So waiting threads must be prepared to acquire
1.856 + * and lose leadership while waiting.
1.857 + */
1.858 + private Thread leader = null;
1.859 +
1.860 + /**
1.861 + * Condition signalled when a newer task becomes available at the
1.862 + * head of the queue or a new thread may need to become leader.
1.863 + */
1.864 + private final Condition available = lock.newCondition();
1.865 +
1.866 + /**
1.867 + * Set f's heapIndex if it is a ScheduledFutureTask.
1.868 + */
1.869 + private void setIndex(RunnableScheduledFuture f, int idx) {
1.870 + if (f instanceof ScheduledFutureTask)
1.871 + ((ScheduledFutureTask)f).heapIndex = idx;
1.872 + }
1.873 +
1.874 + /**
1.875 + * Sift element added at bottom up to its heap-ordered spot.
1.876 + * Call only when holding lock.
1.877 + */
1.878 + private void siftUp(int k, RunnableScheduledFuture key) {
1.879 + while (k > 0) {
1.880 + int parent = (k - 1) >>> 1;
1.881 + RunnableScheduledFuture e = queue[parent];
1.882 + if (key.compareTo(e) >= 0)
1.883 + break;
1.884 + queue[k] = e;
1.885 + setIndex(e, k);
1.886 + k = parent;
1.887 + }
1.888 + queue[k] = key;
1.889 + setIndex(key, k);
1.890 + }
1.891 +
1.892 + /**
1.893 + * Sift element added at top down to its heap-ordered spot.
1.894 + * Call only when holding lock.
1.895 + */
1.896 + private void siftDown(int k, RunnableScheduledFuture key) {
1.897 + int half = size >>> 1;
1.898 + while (k < half) {
1.899 + int child = (k << 1) + 1;
1.900 + RunnableScheduledFuture c = queue[child];
1.901 + int right = child + 1;
1.902 + if (right < size && c.compareTo(queue[right]) > 0)
1.903 + c = queue[child = right];
1.904 + if (key.compareTo(c) <= 0)
1.905 + break;
1.906 + queue[k] = c;
1.907 + setIndex(c, k);
1.908 + k = child;
1.909 + }
1.910 + queue[k] = key;
1.911 + setIndex(key, k);
1.912 + }
1.913 +
1.914 + /**
1.915 + * Resize the heap array. Call only when holding lock.
1.916 + */
1.917 + private void grow() {
1.918 + int oldCapacity = queue.length;
1.919 + int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1.920 + if (newCapacity < 0) // overflow
1.921 + newCapacity = Integer.MAX_VALUE;
1.922 + queue = Arrays.copyOf(queue, newCapacity);
1.923 + }
1.924 +
1.925 + /**
1.926 + * Find index of given object, or -1 if absent
1.927 + */
1.928 + private int indexOf(Object x) {
1.929 + if (x != null) {
1.930 + if (x instanceof ScheduledFutureTask) {
1.931 + int i = ((ScheduledFutureTask) x).heapIndex;
1.932 + // Sanity check; x could conceivably be a
1.933 + // ScheduledFutureTask from some other pool.
1.934 + if (i >= 0 && i < size && queue[i] == x)
1.935 + return i;
1.936 + } else {
1.937 + for (int i = 0; i < size; i++)
1.938 + if (x.equals(queue[i]))
1.939 + return i;
1.940 + }
1.941 + }
1.942 + return -1;
1.943 + }
1.944 +
1.945 + public boolean contains(Object x) {
1.946 + final ReentrantLock lock = this.lock;
1.947 + lock.lock();
1.948 + try {
1.949 + return indexOf(x) != -1;
1.950 + } finally {
1.951 + lock.unlock();
1.952 + }
1.953 + }
1.954 +
1.955 + public boolean remove(Object x) {
1.956 + final ReentrantLock lock = this.lock;
1.957 + lock.lock();
1.958 + try {
1.959 + int i = indexOf(x);
1.960 + if (i < 0)
1.961 + return false;
1.962 +
1.963 + setIndex(queue[i], -1);
1.964 + int s = --size;
1.965 + RunnableScheduledFuture replacement = queue[s];
1.966 + queue[s] = null;
1.967 + if (s != i) {
1.968 + siftDown(i, replacement);
1.969 + if (queue[i] == replacement)
1.970 + siftUp(i, replacement);
1.971 + }
1.972 + return true;
1.973 + } finally {
1.974 + lock.unlock();
1.975 + }
1.976 + }
1.977 +
1.978 + public int size() {
1.979 + final ReentrantLock lock = this.lock;
1.980 + lock.lock();
1.981 + try {
1.982 + return size;
1.983 + } finally {
1.984 + lock.unlock();
1.985 + }
1.986 + }
1.987 +
1.988 + public boolean isEmpty() {
1.989 + return size() == 0;
1.990 + }
1.991 +
1.992 + public int remainingCapacity() {
1.993 + return Integer.MAX_VALUE;
1.994 + }
1.995 +
1.996 + public RunnableScheduledFuture peek() {
1.997 + final ReentrantLock lock = this.lock;
1.998 + lock.lock();
1.999 + try {
1.1000 + return queue[0];
1.1001 + } finally {
1.1002 + lock.unlock();
1.1003 + }
1.1004 + }
1.1005 +
1.1006 + public boolean offer(Runnable x) {
1.1007 + if (x == null)
1.1008 + throw new NullPointerException();
1.1009 + RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1.1010 + final ReentrantLock lock = this.lock;
1.1011 + lock.lock();
1.1012 + try {
1.1013 + int i = size;
1.1014 + if (i >= queue.length)
1.1015 + grow();
1.1016 + size = i + 1;
1.1017 + if (i == 0) {
1.1018 + queue[0] = e;
1.1019 + setIndex(e, 0);
1.1020 + } else {
1.1021 + siftUp(i, e);
1.1022 + }
1.1023 + if (queue[0] == e) {
1.1024 + leader = null;
1.1025 + available.signal();
1.1026 + }
1.1027 + } finally {
1.1028 + lock.unlock();
1.1029 + }
1.1030 + return true;
1.1031 + }
1.1032 +
1.1033 + public void put(Runnable e) {
1.1034 + offer(e);
1.1035 + }
1.1036 +
1.1037 + public boolean add(Runnable e) {
1.1038 + return offer(e);
1.1039 + }
1.1040 +
1.1041 + public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1.1042 + return offer(e);
1.1043 + }
1.1044 +
1.1045 + /**
1.1046 + * Performs common bookkeeping for poll and take: Replaces
1.1047 + * first element with last and sifts it down. Call only when
1.1048 + * holding lock.
1.1049 + * @param f the task to remove and return
1.1050 + */
1.1051 + private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1.1052 + int s = --size;
1.1053 + RunnableScheduledFuture x = queue[s];
1.1054 + queue[s] = null;
1.1055 + if (s != 0)
1.1056 + siftDown(0, x);
1.1057 + setIndex(f, -1);
1.1058 + return f;
1.1059 + }
1.1060 +
1.1061 + public RunnableScheduledFuture poll() {
1.1062 + final ReentrantLock lock = this.lock;
1.1063 + lock.lock();
1.1064 + try {
1.1065 + RunnableScheduledFuture first = queue[0];
1.1066 + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1.1067 + return null;
1.1068 + else
1.1069 + return finishPoll(first);
1.1070 + } finally {
1.1071 + lock.unlock();
1.1072 + }
1.1073 + }
1.1074 +
1.1075 + public RunnableScheduledFuture take() throws InterruptedException {
1.1076 + final ReentrantLock lock = this.lock;
1.1077 + lock.lockInterruptibly();
1.1078 + try {
1.1079 + for (;;) {
1.1080 + RunnableScheduledFuture first = queue[0];
1.1081 + if (first == null)
1.1082 + available.await();
1.1083 + else {
1.1084 + long delay = first.getDelay(TimeUnit.NANOSECONDS);
1.1085 + if (delay <= 0)
1.1086 + return finishPoll(first);
1.1087 + else if (leader != null)
1.1088 + available.await();
1.1089 + else {
1.1090 + Thread thisThread = Thread.currentThread();
1.1091 + leader = thisThread;
1.1092 + try {
1.1093 + available.awaitNanos(delay);
1.1094 + } finally {
1.1095 + if (leader == thisThread)
1.1096 + leader = null;
1.1097 + }
1.1098 + }
1.1099 + }
1.1100 + }
1.1101 + } finally {
1.1102 + if (leader == null && queue[0] != null)
1.1103 + available.signal();
1.1104 + lock.unlock();
1.1105 + }
1.1106 + }
1.1107 +
1.1108 + public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1.1109 + throws InterruptedException {
1.1110 + long nanos = unit.toNanos(timeout);
1.1111 + final ReentrantLock lock = this.lock;
1.1112 + lock.lockInterruptibly();
1.1113 + try {
1.1114 + for (;;) {
1.1115 + RunnableScheduledFuture first = queue[0];
1.1116 + if (first == null) {
1.1117 + if (nanos <= 0)
1.1118 + return null;
1.1119 + else
1.1120 + nanos = available.awaitNanos(nanos);
1.1121 + } else {
1.1122 + long delay = first.getDelay(TimeUnit.NANOSECONDS);
1.1123 + if (delay <= 0)
1.1124 + return finishPoll(first);
1.1125 + if (nanos <= 0)
1.1126 + return null;
1.1127 + if (nanos < delay || leader != null)
1.1128 + nanos = available.awaitNanos(nanos);
1.1129 + else {
1.1130 + Thread thisThread = Thread.currentThread();
1.1131 + leader = thisThread;
1.1132 + try {
1.1133 + long timeLeft = available.awaitNanos(delay);
1.1134 + nanos -= delay - timeLeft;
1.1135 + } finally {
1.1136 + if (leader == thisThread)
1.1137 + leader = null;
1.1138 + }
1.1139 + }
1.1140 + }
1.1141 + }
1.1142 + } finally {
1.1143 + if (leader == null && queue[0] != null)
1.1144 + available.signal();
1.1145 + lock.unlock();
1.1146 + }
1.1147 + }
1.1148 +
1.1149 + public void clear() {
1.1150 + final ReentrantLock lock = this.lock;
1.1151 + lock.lock();
1.1152 + try {
1.1153 + for (int i = 0; i < size; i++) {
1.1154 + RunnableScheduledFuture t = queue[i];
1.1155 + if (t != null) {
1.1156 + queue[i] = null;
1.1157 + setIndex(t, -1);
1.1158 + }
1.1159 + }
1.1160 + size = 0;
1.1161 + } finally {
1.1162 + lock.unlock();
1.1163 + }
1.1164 + }
1.1165 +
1.1166 + /**
1.1167 + * Return and remove first element only if it is expired.
1.1168 + * Used only by drainTo. Call only when holding lock.
1.1169 + */
1.1170 + private RunnableScheduledFuture pollExpired() {
1.1171 + RunnableScheduledFuture first = queue[0];
1.1172 + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1.1173 + return null;
1.1174 + return finishPoll(first);
1.1175 + }
1.1176 +
1.1177 + public int drainTo(Collection<? super Runnable> c) {
1.1178 + if (c == null)
1.1179 + throw new NullPointerException();
1.1180 + if (c == this)
1.1181 + throw new IllegalArgumentException();
1.1182 + final ReentrantLock lock = this.lock;
1.1183 + lock.lock();
1.1184 + try {
1.1185 + RunnableScheduledFuture first;
1.1186 + int n = 0;
1.1187 + while ((first = pollExpired()) != null) {
1.1188 + c.add(first);
1.1189 + ++n;
1.1190 + }
1.1191 + return n;
1.1192 + } finally {
1.1193 + lock.unlock();
1.1194 + }
1.1195 + }
1.1196 +
1.1197 + public int drainTo(Collection<? super Runnable> c, int maxElements) {
1.1198 + if (c == null)
1.1199 + throw new NullPointerException();
1.1200 + if (c == this)
1.1201 + throw new IllegalArgumentException();
1.1202 + if (maxElements <= 0)
1.1203 + return 0;
1.1204 + final ReentrantLock lock = this.lock;
1.1205 + lock.lock();
1.1206 + try {
1.1207 + RunnableScheduledFuture first;
1.1208 + int n = 0;
1.1209 + while (n < maxElements && (first = pollExpired()) != null) {
1.1210 + c.add(first);
1.1211 + ++n;
1.1212 + }
1.1213 + return n;
1.1214 + } finally {
1.1215 + lock.unlock();
1.1216 + }
1.1217 + }
1.1218 +
1.1219 + public Object[] toArray() {
1.1220 + final ReentrantLock lock = this.lock;
1.1221 + lock.lock();
1.1222 + try {
1.1223 + return Arrays.copyOf(queue, size, Object[].class);
1.1224 + } finally {
1.1225 + lock.unlock();
1.1226 + }
1.1227 + }
1.1228 +
1.1229 + @SuppressWarnings("unchecked")
1.1230 + public <T> T[] toArray(T[] a) {
1.1231 + final ReentrantLock lock = this.lock;
1.1232 + lock.lock();
1.1233 + try {
1.1234 + if (a.length < size)
1.1235 + return (T[]) Arrays.copyOf(queue, size, a.getClass());
1.1236 + System.arraycopy(queue, 0, a, 0, size);
1.1237 + if (a.length > size)
1.1238 + a[size] = null;
1.1239 + return a;
1.1240 + } finally {
1.1241 + lock.unlock();
1.1242 + }
1.1243 + }
1.1244 +
1.1245 + public Iterator<Runnable> iterator() {
1.1246 + return new Itr(Arrays.copyOf(queue, size));
1.1247 + }
1.1248 +
1.1249 + /**
1.1250 + * Snapshot iterator that works off copy of underlying q array.
1.1251 + */
1.1252 + private class Itr implements Iterator<Runnable> {
1.1253 + final RunnableScheduledFuture[] array;
1.1254 + int cursor = 0; // index of next element to return
1.1255 + int lastRet = -1; // index of last element, or -1 if no such
1.1256 +
1.1257 + Itr(RunnableScheduledFuture[] array) {
1.1258 + this.array = array;
1.1259 + }
1.1260 +
1.1261 + public boolean hasNext() {
1.1262 + return cursor < array.length;
1.1263 + }
1.1264 +
1.1265 + public Runnable next() {
1.1266 + if (cursor >= array.length)
1.1267 + throw new NoSuchElementException();
1.1268 + lastRet = cursor;
1.1269 + return array[cursor++];
1.1270 + }
1.1271 +
1.1272 + public void remove() {
1.1273 + if (lastRet < 0)
1.1274 + throw new IllegalStateException();
1.1275 + DelayedWorkQueue.this.remove(array[lastRet]);
1.1276 + lastRet = -1;
1.1277 + }
1.1278 + }
1.1279 + }
1.1280 +}