rt/emul/compact/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
branchjdk7-b147
changeset 1890 212417b74b72
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java	Sat Mar 19 10:46:31 2016 +0100
     1.3 @@ -0,0 +1,1277 @@
     1.4 +/*
     1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     1.6 + *
     1.7 + * This code is free software; you can redistribute it and/or modify it
     1.8 + * under the terms of the GNU General Public License version 2 only, as
     1.9 + * published by the Free Software Foundation.  Oracle designates this
    1.10 + * particular file as subject to the "Classpath" exception as provided
    1.11 + * by Oracle in the LICENSE file that accompanied this code.
    1.12 + *
    1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
    1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    1.15 + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    1.16 + * version 2 for more details (a copy is included in the LICENSE file that
    1.17 + * accompanied this code).
    1.18 + *
    1.19 + * You should have received a copy of the GNU General Public License version
    1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
    1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    1.22 + *
    1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    1.24 + * or visit www.oracle.com if you need additional information or have any
    1.25 + * questions.
    1.26 + */
    1.27 +
    1.28 +/*
    1.29 + * This file is available under and governed by the GNU General Public
    1.30 + * License version 2 only, as published by the Free Software Foundation.
    1.31 + * However, the following notice accompanied the original version of this
    1.32 + * file:
    1.33 + *
    1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
    1.35 + * Expert Group and released to the public domain, as explained at
    1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
    1.37 + */
    1.38 +
    1.39 +package java.util.concurrent;
    1.40 +import java.util.concurrent.atomic.*;
    1.41 +import java.util.concurrent.locks.*;
    1.42 +import java.util.*;
    1.43 +
    1.44 +/**
    1.45 + * A {@link ThreadPoolExecutor} that can additionally schedule
    1.46 + * commands to run after a given delay, or to execute
    1.47 + * periodically. This class is preferable to {@link java.util.Timer}
    1.48 + * when multiple worker threads are needed, or when the additional
    1.49 + * flexibility or capabilities of {@link ThreadPoolExecutor} (which
    1.50 + * this class extends) are required.
    1.51 + *
    1.52 + * <p>Delayed tasks execute no sooner than they are enabled, but
    1.53 + * without any real-time guarantees about when, after they are
    1.54 + * enabled, they will commence. Tasks scheduled for exactly the same
    1.55 + * execution time are enabled in first-in-first-out (FIFO) order of
    1.56 + * submission.
    1.57 + *
    1.58 + * <p>When a submitted task is cancelled before it is run, execution
    1.59 + * is suppressed. By default, such a cancelled task is not
    1.60 + * automatically removed from the work queue until its delay
    1.61 + * elapses. While this enables further inspection and monitoring, it
    1.62 + * may also cause unbounded retention of cancelled tasks. To avoid
    1.63 + * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
    1.64 + * causes tasks to be immediately removed from the work queue at
    1.65 + * time of cancellation.
    1.66 + *
    1.67 + * <p>Successive executions of a task scheduled via
    1.68 + * {@code scheduleAtFixedRate} or
    1.69 + * {@code scheduleWithFixedDelay} do not overlap. While different
    1.70 + * executions may be performed by different threads, the effects of
    1.71 + * prior executions <a
    1.72 + * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
    1.73 + * those of subsequent ones.
    1.74 + *
    1.75 + * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
    1.76 + * of the inherited tuning methods are not useful for it. In
    1.77 + * particular, because it acts as a fixed-sized pool using
    1.78 + * {@code corePoolSize} threads and an unbounded queue, adjustments
    1.79 + * to {@code maximumPoolSize} have no useful effect. Additionally, it
    1.80 + * is almost never a good idea to set {@code corePoolSize} to zero or
    1.81 + * use {@code allowCoreThreadTimeOut} because this may leave the pool
    1.82 + * without threads to handle tasks once they become eligible to run.
    1.83 + *
    1.84 + * <p><b>Extension notes:</b> This class overrides the
    1.85 + * {@link ThreadPoolExecutor#execute execute} and
    1.86 + * {@link AbstractExecutorService#submit(Runnable) submit}
    1.87 + * methods to generate internal {@link ScheduledFuture} objects to
    1.88 + * control per-task delays and scheduling.  To preserve
    1.89 + * functionality, any further overrides of these methods in
    1.90 + * subclasses must invoke superclass versions, which effectively
    1.91 + * disables additional task customization.  However, this class
    1.92 + * provides alternative protected extension method
    1.93 + * {@code decorateTask} (one version each for {@code Runnable} and
    1.94 + * {@code Callable}) that can be used to customize the concrete task
    1.95 + * types used to execute commands entered via {@code execute},
    1.96 + * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
    1.97 + * and {@code scheduleWithFixedDelay}.  By default, a
    1.98 + * {@code ScheduledThreadPoolExecutor} uses a task type extending
    1.99 + * {@link FutureTask}. However, this may be modified or replaced using
   1.100 + * subclasses of the form:
   1.101 + *
   1.102 + *  <pre> {@code
   1.103 + * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
   1.104 + *
   1.105 + *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
   1.106 + *
   1.107 + *   protected <V> RunnableScheduledFuture<V> decorateTask(
   1.108 + *                Runnable r, RunnableScheduledFuture<V> task) {
   1.109 + *       return new CustomTask<V>(r, task);
   1.110 + *   }
   1.111 + *
   1.112 + *   protected <V> RunnableScheduledFuture<V> decorateTask(
   1.113 + *                Callable<V> c, RunnableScheduledFuture<V> task) {
   1.114 + *       return new CustomTask<V>(c, task);
   1.115 + *   }
   1.116 + *   // ... add constructors, etc.
   1.117 + * }}</pre>
   1.118 + *
   1.119 + * @since 1.5
   1.120 + * @author Doug Lea
   1.121 + */
   1.122 +public class ScheduledThreadPoolExecutor
   1.123 +        extends ThreadPoolExecutor
   1.124 +        implements ScheduledExecutorService {
   1.125 +
   1.126 +    /*
   1.127 +     * This class specializes ThreadPoolExecutor implementation by
   1.128 +     *
   1.129 +     * 1. Using a custom task type, ScheduledFutureTask for
   1.130 +     *    tasks, even those that don't require scheduling (i.e.,
   1.131 +     *    those submitted using ExecutorService execute, not
   1.132 +     *    ScheduledExecutorService methods) which are treated as
   1.133 +     *    delayed tasks with a delay of zero.
   1.134 +     *
   1.135 +     * 2. Using a custom queue (DelayedWorkQueue), a variant of
   1.136 +     *    unbounded DelayQueue. The lack of capacity constraint and
   1.137 +     *    the fact that corePoolSize and maximumPoolSize are
   1.138 +     *    effectively identical simplifies some execution mechanics
   1.139 +     *    (see delayedExecute) compared to ThreadPoolExecutor.
   1.140 +     *
   1.141 +     * 3. Supporting optional run-after-shutdown parameters, which
   1.142 +     *    leads to overrides of shutdown methods to remove and cancel
   1.143 +     *    tasks that should NOT be run after shutdown, as well as
   1.144 +     *    different recheck logic when task (re)submission overlaps
   1.145 +     *    with a shutdown.
   1.146 +     *
   1.147 +     * 4. Task decoration methods to allow interception and
   1.148 +     *    instrumentation, which are needed because subclasses cannot
   1.149 +     *    otherwise override submit methods to get this effect. These
   1.150 +     *    don't have any impact on pool control logic though.
   1.151 +     */
   1.152 +
   1.153 +    /**
   1.154 +     * False if should cancel/suppress periodic tasks on shutdown.
   1.155 +     */
   1.156 +    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
   1.157 +
   1.158 +    /**
   1.159 +     * False if should cancel non-periodic tasks on shutdown.
   1.160 +     */
   1.161 +    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
   1.162 +
   1.163 +    /**
   1.164 +     * True if ScheduledFutureTask.cancel should remove from queue
   1.165 +     */
   1.166 +    private volatile boolean removeOnCancel = false;
   1.167 +
   1.168 +    /**
   1.169 +     * Sequence number to break scheduling ties, and in turn to
   1.170 +     * guarantee FIFO order among tied entries.
   1.171 +     */
   1.172 +    private static final AtomicLong sequencer = new AtomicLong(0);
   1.173 +
   1.174 +    /**
   1.175 +     * Returns current nanosecond time.
   1.176 +     */
   1.177 +    final long now() {
   1.178 +        return System.nanoTime();
   1.179 +    }
   1.180 +
   1.181 +    private class ScheduledFutureTask<V>
   1.182 +            extends FutureTask<V> implements RunnableScheduledFuture<V> {
   1.183 +
   1.184 +        /** Sequence number to break ties FIFO */
   1.185 +        private final long sequenceNumber;
   1.186 +
   1.187 +        /** The time the task is enabled to execute in nanoTime units */
   1.188 +        private long time;
   1.189 +
   1.190 +        /**
   1.191 +         * Period in nanoseconds for repeating tasks.  A positive
   1.192 +         * value indicates fixed-rate execution.  A negative value
   1.193 +         * indicates fixed-delay execution.  A value of 0 indicates a
   1.194 +         * non-repeating task.
   1.195 +         */
   1.196 +        private final long period;
   1.197 +
   1.198 +        /** The actual task to be re-enqueued by reExecutePeriodic */
   1.199 +        RunnableScheduledFuture<V> outerTask = this;
   1.200 +
   1.201 +        /**
   1.202 +         * Index into delay queue, to support faster cancellation.
   1.203 +         */
   1.204 +        int heapIndex;
   1.205 +
   1.206 +        /**
   1.207 +         * Creates a one-shot action with given nanoTime-based trigger time.
   1.208 +         */
   1.209 +        ScheduledFutureTask(Runnable r, V result, long ns) {
   1.210 +            super(r, result);
   1.211 +            this.time = ns;
   1.212 +            this.period = 0;
   1.213 +            this.sequenceNumber = sequencer.getAndIncrement();
   1.214 +        }
   1.215 +
   1.216 +        /**
   1.217 +         * Creates a periodic action with given nano time and period.
   1.218 +         */
   1.219 +        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
   1.220 +            super(r, result);
   1.221 +            this.time = ns;
   1.222 +            this.period = period;
   1.223 +            this.sequenceNumber = sequencer.getAndIncrement();
   1.224 +        }
   1.225 +
   1.226 +        /**
   1.227 +         * Creates a one-shot action with given nanoTime-based trigger.
   1.228 +         */
   1.229 +        ScheduledFutureTask(Callable<V> callable, long ns) {
   1.230 +            super(callable);
   1.231 +            this.time = ns;
   1.232 +            this.period = 0;
   1.233 +            this.sequenceNumber = sequencer.getAndIncrement();
   1.234 +        }
   1.235 +
   1.236 +        public long getDelay(TimeUnit unit) {
   1.237 +            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
   1.238 +        }
   1.239 +
   1.240 +        public int compareTo(Delayed other) {
   1.241 +            if (other == this) // compare zero ONLY if same object
   1.242 +                return 0;
   1.243 +            if (other instanceof ScheduledFutureTask) {
   1.244 +                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
   1.245 +                long diff = time - x.time;
   1.246 +                if (diff < 0)
   1.247 +                    return -1;
   1.248 +                else if (diff > 0)
   1.249 +                    return 1;
   1.250 +                else if (sequenceNumber < x.sequenceNumber)
   1.251 +                    return -1;
   1.252 +                else
   1.253 +                    return 1;
   1.254 +            }
   1.255 +            long d = (getDelay(TimeUnit.NANOSECONDS) -
   1.256 +                      other.getDelay(TimeUnit.NANOSECONDS));
   1.257 +            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
   1.258 +        }
   1.259 +
   1.260 +        /**
   1.261 +         * Returns true if this is a periodic (not a one-shot) action.
   1.262 +         *
   1.263 +         * @return true if periodic
   1.264 +         */
   1.265 +        public boolean isPeriodic() {
   1.266 +            return period != 0;
   1.267 +        }
   1.268 +
   1.269 +        /**
   1.270 +         * Sets the next time to run for a periodic task.
   1.271 +         */
   1.272 +        private void setNextRunTime() {
   1.273 +            long p = period;
   1.274 +            if (p > 0)
   1.275 +                time += p;
   1.276 +            else
   1.277 +                time = triggerTime(-p);
   1.278 +        }
   1.279 +
   1.280 +        public boolean cancel(boolean mayInterruptIfRunning) {
   1.281 +            boolean cancelled = super.cancel(mayInterruptIfRunning);
   1.282 +            if (cancelled && removeOnCancel && heapIndex >= 0)
   1.283 +                remove(this);
   1.284 +            return cancelled;
   1.285 +        }
   1.286 +
   1.287 +        /**
   1.288 +         * Overrides FutureTask version so as to reset/requeue if periodic.
   1.289 +         */
   1.290 +        public void run() {
   1.291 +            boolean periodic = isPeriodic();
   1.292 +            if (!canRunInCurrentRunState(periodic))
   1.293 +                cancel(false);
   1.294 +            else if (!periodic)
   1.295 +                ScheduledFutureTask.super.run();
   1.296 +            else if (ScheduledFutureTask.super.runAndReset()) {
   1.297 +                setNextRunTime();
   1.298 +                reExecutePeriodic(outerTask);
   1.299 +            }
   1.300 +        }
   1.301 +    }
   1.302 +
   1.303 +    /**
   1.304 +     * Returns true if can run a task given current run state
   1.305 +     * and run-after-shutdown parameters.
   1.306 +     *
   1.307 +     * @param periodic true if this task periodic, false if delayed
   1.308 +     */
   1.309 +    boolean canRunInCurrentRunState(boolean periodic) {
   1.310 +        return isRunningOrShutdown(periodic ?
   1.311 +                                   continueExistingPeriodicTasksAfterShutdown :
   1.312 +                                   executeExistingDelayedTasksAfterShutdown);
   1.313 +    }
   1.314 +
   1.315 +    /**
   1.316 +     * Main execution method for delayed or periodic tasks.  If pool
   1.317 +     * is shut down, rejects the task. Otherwise adds task to queue
   1.318 +     * and starts a thread, if necessary, to run it.  (We cannot
   1.319 +     * prestart the thread to run the task because the task (probably)
   1.320 +     * shouldn't be run yet,) If the pool is shut down while the task
   1.321 +     * is being added, cancel and remove it if required by state and
   1.322 +     * run-after-shutdown parameters.
   1.323 +     *
   1.324 +     * @param task the task
   1.325 +     */
   1.326 +    private void delayedExecute(RunnableScheduledFuture<?> task) {
   1.327 +        if (isShutdown())
   1.328 +            reject(task);
   1.329 +        else {
   1.330 +            super.getQueue().add(task);
   1.331 +            if (isShutdown() &&
   1.332 +                !canRunInCurrentRunState(task.isPeriodic()) &&
   1.333 +                remove(task))
   1.334 +                task.cancel(false);
   1.335 +            else
   1.336 +                prestartCoreThread();
   1.337 +        }
   1.338 +    }
   1.339 +
   1.340 +    /**
   1.341 +     * Requeues a periodic task unless current run state precludes it.
   1.342 +     * Same idea as delayedExecute except drops task rather than rejecting.
   1.343 +     *
   1.344 +     * @param task the task
   1.345 +     */
   1.346 +    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
   1.347 +        if (canRunInCurrentRunState(true)) {
   1.348 +            super.getQueue().add(task);
   1.349 +            if (!canRunInCurrentRunState(true) && remove(task))
   1.350 +                task.cancel(false);
   1.351 +            else
   1.352 +                prestartCoreThread();
   1.353 +        }
   1.354 +    }
   1.355 +
   1.356 +    /**
   1.357 +     * Cancels and clears the queue of all tasks that should not be run
   1.358 +     * due to shutdown policy.  Invoked within super.shutdown.
   1.359 +     */
   1.360 +    @Override void onShutdown() {
   1.361 +        BlockingQueue<Runnable> q = super.getQueue();
   1.362 +        boolean keepDelayed =
   1.363 +            getExecuteExistingDelayedTasksAfterShutdownPolicy();
   1.364 +        boolean keepPeriodic =
   1.365 +            getContinueExistingPeriodicTasksAfterShutdownPolicy();
   1.366 +        if (!keepDelayed && !keepPeriodic) {
   1.367 +            for (Object e : q.toArray())
   1.368 +                if (e instanceof RunnableScheduledFuture<?>)
   1.369 +                    ((RunnableScheduledFuture<?>) e).cancel(false);
   1.370 +            q.clear();
   1.371 +        }
   1.372 +        else {
   1.373 +            // Traverse snapshot to avoid iterator exceptions
   1.374 +            for (Object e : q.toArray()) {
   1.375 +                if (e instanceof RunnableScheduledFuture) {
   1.376 +                    RunnableScheduledFuture<?> t =
   1.377 +                        (RunnableScheduledFuture<?>)e;
   1.378 +                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
   1.379 +                        t.isCancelled()) { // also remove if already cancelled
   1.380 +                        if (q.remove(t))
   1.381 +                            t.cancel(false);
   1.382 +                    }
   1.383 +                }
   1.384 +            }
   1.385 +        }
   1.386 +        tryTerminate();
   1.387 +    }
   1.388 +
   1.389 +    /**
   1.390 +     * Modifies or replaces the task used to execute a runnable.
   1.391 +     * This method can be used to override the concrete
   1.392 +     * class used for managing internal tasks.
   1.393 +     * The default implementation simply returns the given task.
   1.394 +     *
   1.395 +     * @param runnable the submitted Runnable
   1.396 +     * @param task the task created to execute the runnable
   1.397 +     * @return a task that can execute the runnable
   1.398 +     * @since 1.6
   1.399 +     */
   1.400 +    protected <V> RunnableScheduledFuture<V> decorateTask(
   1.401 +        Runnable runnable, RunnableScheduledFuture<V> task) {
   1.402 +        return task;
   1.403 +    }
   1.404 +
   1.405 +    /**
   1.406 +     * Modifies or replaces the task used to execute a callable.
   1.407 +     * This method can be used to override the concrete
   1.408 +     * class used for managing internal tasks.
   1.409 +     * The default implementation simply returns the given task.
   1.410 +     *
   1.411 +     * @param callable the submitted Callable
   1.412 +     * @param task the task created to execute the callable
   1.413 +     * @return a task that can execute the callable
   1.414 +     * @since 1.6
   1.415 +     */
   1.416 +    protected <V> RunnableScheduledFuture<V> decorateTask(
   1.417 +        Callable<V> callable, RunnableScheduledFuture<V> task) {
   1.418 +        return task;
   1.419 +    }
   1.420 +
   1.421 +    /**
   1.422 +     * Creates a new {@code ScheduledThreadPoolExecutor} with the
   1.423 +     * given core pool size.
   1.424 +     *
   1.425 +     * @param corePoolSize the number of threads to keep in the pool, even
   1.426 +     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   1.427 +     * @throws IllegalArgumentException if {@code corePoolSize < 0}
   1.428 +     */
   1.429 +    public ScheduledThreadPoolExecutor(int corePoolSize) {
   1.430 +        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   1.431 +              new DelayedWorkQueue());
   1.432 +    }
   1.433 +
   1.434 +    /**
   1.435 +     * Creates a new {@code ScheduledThreadPoolExecutor} with the
   1.436 +     * given initial parameters.
   1.437 +     *
   1.438 +     * @param corePoolSize the number of threads to keep in the pool, even
   1.439 +     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   1.440 +     * @param threadFactory the factory to use when the executor
   1.441 +     *        creates a new thread
   1.442 +     * @throws IllegalArgumentException if {@code corePoolSize < 0}
   1.443 +     * @throws NullPointerException if {@code threadFactory} is null
   1.444 +     */
   1.445 +    public ScheduledThreadPoolExecutor(int corePoolSize,
   1.446 +                                       ThreadFactory threadFactory) {
   1.447 +        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   1.448 +              new DelayedWorkQueue(), threadFactory);
   1.449 +    }
   1.450 +
   1.451 +    /**
   1.452 +     * Creates a new ScheduledThreadPoolExecutor with the given
   1.453 +     * initial parameters.
   1.454 +     *
   1.455 +     * @param corePoolSize the number of threads to keep in the pool, even
   1.456 +     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   1.457 +     * @param handler the handler to use when execution is blocked
   1.458 +     *        because the thread bounds and queue capacities are reached
   1.459 +     * @throws IllegalArgumentException if {@code corePoolSize < 0}
   1.460 +     * @throws NullPointerException if {@code handler} is null
   1.461 +     */
   1.462 +    public ScheduledThreadPoolExecutor(int corePoolSize,
   1.463 +                                       RejectedExecutionHandler handler) {
   1.464 +        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   1.465 +              new DelayedWorkQueue(), handler);
   1.466 +    }
   1.467 +
   1.468 +    /**
   1.469 +     * Creates a new ScheduledThreadPoolExecutor with the given
   1.470 +     * initial parameters.
   1.471 +     *
   1.472 +     * @param corePoolSize the number of threads to keep in the pool, even
   1.473 +     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   1.474 +     * @param threadFactory the factory to use when the executor
   1.475 +     *        creates a new thread
   1.476 +     * @param handler the handler to use when execution is blocked
   1.477 +     *        because the thread bounds and queue capacities are reached
   1.478 +     * @throws IllegalArgumentException if {@code corePoolSize < 0}
   1.479 +     * @throws NullPointerException if {@code threadFactory} or
   1.480 +     *         {@code handler} is null
   1.481 +     */
   1.482 +    public ScheduledThreadPoolExecutor(int corePoolSize,
   1.483 +                                       ThreadFactory threadFactory,
   1.484 +                                       RejectedExecutionHandler handler) {
   1.485 +        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   1.486 +              new DelayedWorkQueue(), threadFactory, handler);
   1.487 +    }
   1.488 +
   1.489 +    /**
   1.490 +     * Returns the trigger time of a delayed action.
   1.491 +     */
   1.492 +    private long triggerTime(long delay, TimeUnit unit) {
   1.493 +        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
   1.494 +    }
   1.495 +
   1.496 +    /**
   1.497 +     * Returns the trigger time of a delayed action.
   1.498 +     */
   1.499 +    long triggerTime(long delay) {
   1.500 +        return now() +
   1.501 +            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
   1.502 +    }
   1.503 +
   1.504 +    /**
   1.505 +     * Constrains the values of all delays in the queue to be within
   1.506 +     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
   1.507 +     * This may occur if a task is eligible to be dequeued, but has
   1.508 +     * not yet been, while some other task is added with a delay of
   1.509 +     * Long.MAX_VALUE.
   1.510 +     */
   1.511 +    private long overflowFree(long delay) {
   1.512 +        Delayed head = (Delayed) super.getQueue().peek();
   1.513 +        if (head != null) {
   1.514 +            long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
   1.515 +            if (headDelay < 0 && (delay - headDelay < 0))
   1.516 +                delay = Long.MAX_VALUE + headDelay;
   1.517 +        }
   1.518 +        return delay;
   1.519 +    }
   1.520 +
   1.521 +    /**
   1.522 +     * @throws RejectedExecutionException {@inheritDoc}
   1.523 +     * @throws NullPointerException       {@inheritDoc}
   1.524 +     */
   1.525 +    public ScheduledFuture<?> schedule(Runnable command,
   1.526 +                                       long delay,
   1.527 +                                       TimeUnit unit) {
   1.528 +        if (command == null || unit == null)
   1.529 +            throw new NullPointerException();
   1.530 +        RunnableScheduledFuture<?> t = decorateTask(command,
   1.531 +            new ScheduledFutureTask<Void>(command, null,
   1.532 +                                          triggerTime(delay, unit)));
   1.533 +        delayedExecute(t);
   1.534 +        return t;
   1.535 +    }
   1.536 +
   1.537 +    /**
   1.538 +     * @throws RejectedExecutionException {@inheritDoc}
   1.539 +     * @throws NullPointerException       {@inheritDoc}
   1.540 +     */
   1.541 +    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
   1.542 +                                           long delay,
   1.543 +                                           TimeUnit unit) {
   1.544 +        if (callable == null || unit == null)
   1.545 +            throw new NullPointerException();
   1.546 +        RunnableScheduledFuture<V> t = decorateTask(callable,
   1.547 +            new ScheduledFutureTask<V>(callable,
   1.548 +                                       triggerTime(delay, unit)));
   1.549 +        delayedExecute(t);
   1.550 +        return t;
   1.551 +    }
   1.552 +
   1.553 +    /**
   1.554 +     * @throws RejectedExecutionException {@inheritDoc}
   1.555 +     * @throws NullPointerException       {@inheritDoc}
   1.556 +     * @throws IllegalArgumentException   {@inheritDoc}
   1.557 +     */
   1.558 +    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
   1.559 +                                                  long initialDelay,
   1.560 +                                                  long period,
   1.561 +                                                  TimeUnit unit) {
   1.562 +        if (command == null || unit == null)
   1.563 +            throw new NullPointerException();
   1.564 +        if (period <= 0)
   1.565 +            throw new IllegalArgumentException();
   1.566 +        ScheduledFutureTask<Void> sft =
   1.567 +            new ScheduledFutureTask<Void>(command,
   1.568 +                                          null,
   1.569 +                                          triggerTime(initialDelay, unit),
   1.570 +                                          unit.toNanos(period));
   1.571 +        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   1.572 +        sft.outerTask = t;
   1.573 +        delayedExecute(t);
   1.574 +        return t;
   1.575 +    }
   1.576 +
   1.577 +    /**
   1.578 +     * @throws RejectedExecutionException {@inheritDoc}
   1.579 +     * @throws NullPointerException       {@inheritDoc}
   1.580 +     * @throws IllegalArgumentException   {@inheritDoc}
   1.581 +     */
   1.582 +    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
   1.583 +                                                     long initialDelay,
   1.584 +                                                     long delay,
   1.585 +                                                     TimeUnit unit) {
   1.586 +        if (command == null || unit == null)
   1.587 +            throw new NullPointerException();
   1.588 +        if (delay <= 0)
   1.589 +            throw new IllegalArgumentException();
   1.590 +        ScheduledFutureTask<Void> sft =
   1.591 +            new ScheduledFutureTask<Void>(command,
   1.592 +                                          null,
   1.593 +                                          triggerTime(initialDelay, unit),
   1.594 +                                          unit.toNanos(-delay));
   1.595 +        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   1.596 +        sft.outerTask = t;
   1.597 +        delayedExecute(t);
   1.598 +        return t;
   1.599 +    }
   1.600 +
   1.601 +    /**
   1.602 +     * Executes {@code command} with zero required delay.
   1.603 +     * This has effect equivalent to
   1.604 +     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
   1.605 +     * Note that inspections of the queue and of the list returned by
   1.606 +     * {@code shutdownNow} will access the zero-delayed
   1.607 +     * {@link ScheduledFuture}, not the {@code command} itself.
   1.608 +     *
   1.609 +     * <p>A consequence of the use of {@code ScheduledFuture} objects is
   1.610 +     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
   1.611 +     * called with a null second {@code Throwable} argument, even if the
   1.612 +     * {@code command} terminated abruptly.  Instead, the {@code Throwable}
   1.613 +     * thrown by such a task can be obtained via {@link Future#get}.
   1.614 +     *
   1.615 +     * @throws RejectedExecutionException at discretion of
   1.616 +     *         {@code RejectedExecutionHandler}, if the task
   1.617 +     *         cannot be accepted for execution because the
   1.618 +     *         executor has been shut down
   1.619 +     * @throws NullPointerException {@inheritDoc}
   1.620 +     */
   1.621 +    public void execute(Runnable command) {
   1.622 +        schedule(command, 0, TimeUnit.NANOSECONDS);
   1.623 +    }
   1.624 +
   1.625 +    // Override AbstractExecutorService methods
   1.626 +
   1.627 +    /**
   1.628 +     * @throws RejectedExecutionException {@inheritDoc}
   1.629 +     * @throws NullPointerException       {@inheritDoc}
   1.630 +     */
   1.631 +    public Future<?> submit(Runnable task) {
   1.632 +        return schedule(task, 0, TimeUnit.NANOSECONDS);
   1.633 +    }
   1.634 +
   1.635 +    /**
   1.636 +     * @throws RejectedExecutionException {@inheritDoc}
   1.637 +     * @throws NullPointerException       {@inheritDoc}
   1.638 +     */
   1.639 +    public <T> Future<T> submit(Runnable task, T result) {
   1.640 +        return schedule(Executors.callable(task, result),
   1.641 +                        0, TimeUnit.NANOSECONDS);
   1.642 +    }
   1.643 +
   1.644 +    /**
   1.645 +     * @throws RejectedExecutionException {@inheritDoc}
   1.646 +     * @throws NullPointerException       {@inheritDoc}
   1.647 +     */
   1.648 +    public <T> Future<T> submit(Callable<T> task) {
   1.649 +        return schedule(task, 0, TimeUnit.NANOSECONDS);
   1.650 +    }
   1.651 +
   1.652 +    /**
   1.653 +     * Sets the policy on whether to continue executing existing
   1.654 +     * periodic tasks even when this executor has been {@code shutdown}.
   1.655 +     * In this case, these tasks will only terminate upon
   1.656 +     * {@code shutdownNow} or after setting the policy to
   1.657 +     * {@code false} when already shutdown.
   1.658 +     * This value is by default {@code false}.
   1.659 +     *
   1.660 +     * @param value if {@code true}, continue after shutdown, else don't.
   1.661 +     * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
   1.662 +     */
   1.663 +    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
   1.664 +        continueExistingPeriodicTasksAfterShutdown = value;
   1.665 +        if (!value && isShutdown())
   1.666 +            onShutdown();
   1.667 +    }
   1.668 +
   1.669 +    /**
   1.670 +     * Gets the policy on whether to continue executing existing
   1.671 +     * periodic tasks even when this executor has been {@code shutdown}.
   1.672 +     * In this case, these tasks will only terminate upon
   1.673 +     * {@code shutdownNow} or after setting the policy to
   1.674 +     * {@code false} when already shutdown.
   1.675 +     * This value is by default {@code false}.
   1.676 +     *
   1.677 +     * @return {@code true} if will continue after shutdown
   1.678 +     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
   1.679 +     */
   1.680 +    public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
   1.681 +        return continueExistingPeriodicTasksAfterShutdown;
   1.682 +    }
   1.683 +
   1.684 +    /**
   1.685 +     * Sets the policy on whether to execute existing delayed
   1.686 +     * tasks even when this executor has been {@code shutdown}.
   1.687 +     * In this case, these tasks will only terminate upon
   1.688 +     * {@code shutdownNow}, or after setting the policy to
   1.689 +     * {@code false} when already shutdown.
   1.690 +     * This value is by default {@code true}.
   1.691 +     *
   1.692 +     * @param value if {@code true}, execute after shutdown, else don't.
   1.693 +     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
   1.694 +     */
   1.695 +    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
   1.696 +        executeExistingDelayedTasksAfterShutdown = value;
   1.697 +        if (!value && isShutdown())
   1.698 +            onShutdown();
   1.699 +    }
   1.700 +
   1.701 +    /**
   1.702 +     * Gets the policy on whether to execute existing delayed
   1.703 +     * tasks even when this executor has been {@code shutdown}.
   1.704 +     * In this case, these tasks will only terminate upon
   1.705 +     * {@code shutdownNow}, or after setting the policy to
   1.706 +     * {@code false} when already shutdown.
   1.707 +     * This value is by default {@code true}.
   1.708 +     *
   1.709 +     * @return {@code true} if will execute after shutdown
   1.710 +     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
   1.711 +     */
   1.712 +    public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
   1.713 +        return executeExistingDelayedTasksAfterShutdown;
   1.714 +    }
   1.715 +
   1.716 +    /**
   1.717 +     * Sets the policy on whether cancelled tasks should be immediately
   1.718 +     * removed from the work queue at time of cancellation.  This value is
   1.719 +     * by default {@code false}.
   1.720 +     *
   1.721 +     * @param value if {@code true}, remove on cancellation, else don't
   1.722 +     * @see #getRemoveOnCancelPolicy
   1.723 +     * @since 1.7
   1.724 +     */
   1.725 +    public void setRemoveOnCancelPolicy(boolean value) {
   1.726 +        removeOnCancel = value;
   1.727 +    }
   1.728 +
   1.729 +    /**
   1.730 +     * Gets the policy on whether cancelled tasks should be immediately
   1.731 +     * removed from the work queue at time of cancellation.  This value is
   1.732 +     * by default {@code false}.
   1.733 +     *
   1.734 +     * @return {@code true} if cancelled tasks are immediately removed
   1.735 +     *         from the queue
   1.736 +     * @see #setRemoveOnCancelPolicy
   1.737 +     * @since 1.7
   1.738 +     */
   1.739 +    public boolean getRemoveOnCancelPolicy() {
   1.740 +        return removeOnCancel;
   1.741 +    }
   1.742 +
   1.743 +    /**
   1.744 +     * Initiates an orderly shutdown in which previously submitted
   1.745 +     * tasks are executed, but no new tasks will be accepted.
   1.746 +     * Invocation has no additional effect if already shut down.
   1.747 +     *
   1.748 +     * <p>This method does not wait for previously submitted tasks to
   1.749 +     * complete execution.  Use {@link #awaitTermination awaitTermination}
   1.750 +     * to do that.
   1.751 +     *
   1.752 +     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
   1.753 +     * has been set {@code false}, existing delayed tasks whose delays
   1.754 +     * have not yet elapsed are cancelled.  And unless the {@code
   1.755 +     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
   1.756 +     * {@code true}, future executions of existing periodic tasks will
   1.757 +     * be cancelled.
   1.758 +     *
   1.759 +     * @throws SecurityException {@inheritDoc}
   1.760 +     */
   1.761 +    public void shutdown() {
   1.762 +        super.shutdown();
   1.763 +    }
   1.764 +
   1.765 +    /**
   1.766 +     * Attempts to stop all actively executing tasks, halts the
   1.767 +     * processing of waiting tasks, and returns a list of the tasks
   1.768 +     * that were awaiting execution.
   1.769 +     *
   1.770 +     * <p>This method does not wait for actively executing tasks to
   1.771 +     * terminate.  Use {@link #awaitTermination awaitTermination} to
   1.772 +     * do that.
   1.773 +     *
   1.774 +     * <p>There are no guarantees beyond best-effort attempts to stop
   1.775 +     * processing actively executing tasks.  This implementation
   1.776 +     * cancels tasks via {@link Thread#interrupt}, so any task that
   1.777 +     * fails to respond to interrupts may never terminate.
   1.778 +     *
   1.779 +     * @return list of tasks that never commenced execution.
   1.780 +     *         Each element of this list is a {@link ScheduledFuture},
   1.781 +     *         including those tasks submitted using {@code execute},
   1.782 +     *         which are for scheduling purposes used as the basis of a
   1.783 +     *         zero-delay {@code ScheduledFuture}.
   1.784 +     * @throws SecurityException {@inheritDoc}
   1.785 +     */
   1.786 +    public List<Runnable> shutdownNow() {
   1.787 +        return super.shutdownNow();
   1.788 +    }
   1.789 +
   1.790 +    /**
   1.791 +     * Returns the task queue used by this executor.  Each element of
   1.792 +     * this queue is a {@link ScheduledFuture}, including those
   1.793 +     * tasks submitted using {@code execute} which are for scheduling
   1.794 +     * purposes used as the basis of a zero-delay
   1.795 +     * {@code ScheduledFuture}.  Iteration over this queue is
   1.796 +     * <em>not</em> guaranteed to traverse tasks in the order in
   1.797 +     * which they will execute.
   1.798 +     *
   1.799 +     * @return the task queue
   1.800 +     */
   1.801 +    public BlockingQueue<Runnable> getQueue() {
   1.802 +        return super.getQueue();
   1.803 +    }
   1.804 +
   1.805 +    /**
   1.806 +     * Specialized delay queue. To mesh with TPE declarations, this
   1.807 +     * class must be declared as a BlockingQueue<Runnable> even though
   1.808 +     * it can only hold RunnableScheduledFutures.
   1.809 +     */
   1.810 +    static class DelayedWorkQueue extends AbstractQueue<Runnable>
   1.811 +        implements BlockingQueue<Runnable> {
   1.812 +
   1.813 +        /*
   1.814 +         * A DelayedWorkQueue is based on a heap-based data structure
   1.815 +         * like those in DelayQueue and PriorityQueue, except that
   1.816 +         * every ScheduledFutureTask also records its index into the
   1.817 +         * heap array. This eliminates the need to find a task upon
   1.818 +         * cancellation, greatly speeding up removal (down from O(n)
   1.819 +         * to O(log n)), and reducing garbage retention that would
   1.820 +         * otherwise occur by waiting for the element to rise to top
   1.821 +         * before clearing. But because the queue may also hold
   1.822 +         * RunnableScheduledFutures that are not ScheduledFutureTasks,
   1.823 +         * we are not guaranteed to have such indices available, in
   1.824 +         * which case we fall back to linear search. (We expect that
   1.825 +         * most tasks will not be decorated, and that the faster cases
   1.826 +         * will be much more common.)
   1.827 +         *
   1.828 +         * All heap operations must record index changes -- mainly
   1.829 +         * within siftUp and siftDown. Upon removal, a task's
   1.830 +         * heapIndex is set to -1. Note that ScheduledFutureTasks can
   1.831 +         * appear at most once in the queue (this need not be true for
   1.832 +         * other kinds of tasks or work queues), so are uniquely
   1.833 +         * identified by heapIndex.
   1.834 +         */
   1.835 +
   1.836 +        private static final int INITIAL_CAPACITY = 16;
   1.837 +        private RunnableScheduledFuture[] queue =
   1.838 +            new RunnableScheduledFuture[INITIAL_CAPACITY];
   1.839 +        private final ReentrantLock lock = new ReentrantLock();
   1.840 +        private int size = 0;
   1.841 +
   1.842 +        /**
   1.843 +         * Thread designated to wait for the task at the head of the
   1.844 +         * queue.  This variant of the Leader-Follower pattern
   1.845 +         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
   1.846 +         * minimize unnecessary timed waiting.  When a thread becomes
   1.847 +         * the leader, it waits only for the next delay to elapse, but
   1.848 +         * other threads await indefinitely.  The leader thread must
   1.849 +         * signal some other thread before returning from take() or
   1.850 +         * poll(...), unless some other thread becomes leader in the
   1.851 +         * interim.  Whenever the head of the queue is replaced with a
   1.852 +         * task with an earlier expiration time, the leader field is
   1.853 +         * invalidated by being reset to null, and some waiting
   1.854 +         * thread, but not necessarily the current leader, is
   1.855 +         * signalled.  So waiting threads must be prepared to acquire
   1.856 +         * and lose leadership while waiting.
   1.857 +         */
   1.858 +        private Thread leader = null;
   1.859 +
   1.860 +        /**
   1.861 +         * Condition signalled when a newer task becomes available at the
   1.862 +         * head of the queue or a new thread may need to become leader.
   1.863 +         */
   1.864 +        private final Condition available = lock.newCondition();
   1.865 +
   1.866 +        /**
   1.867 +         * Set f's heapIndex if it is a ScheduledFutureTask.
   1.868 +         */
   1.869 +        private void setIndex(RunnableScheduledFuture f, int idx) {
   1.870 +            if (f instanceof ScheduledFutureTask)
   1.871 +                ((ScheduledFutureTask)f).heapIndex = idx;
   1.872 +        }
   1.873 +
   1.874 +        /**
   1.875 +         * Sift element added at bottom up to its heap-ordered spot.
   1.876 +         * Call only when holding lock.
   1.877 +         */
   1.878 +        private void siftUp(int k, RunnableScheduledFuture key) {
   1.879 +            while (k > 0) {
   1.880 +                int parent = (k - 1) >>> 1;
   1.881 +                RunnableScheduledFuture e = queue[parent];
   1.882 +                if (key.compareTo(e) >= 0)
   1.883 +                    break;
   1.884 +                queue[k] = e;
   1.885 +                setIndex(e, k);
   1.886 +                k = parent;
   1.887 +            }
   1.888 +            queue[k] = key;
   1.889 +            setIndex(key, k);
   1.890 +        }
   1.891 +
   1.892 +        /**
   1.893 +         * Sift element added at top down to its heap-ordered spot.
   1.894 +         * Call only when holding lock.
   1.895 +         */
   1.896 +        private void siftDown(int k, RunnableScheduledFuture key) {
   1.897 +            int half = size >>> 1;
   1.898 +            while (k < half) {
   1.899 +                int child = (k << 1) + 1;
   1.900 +                RunnableScheduledFuture c = queue[child];
   1.901 +                int right = child + 1;
   1.902 +                if (right < size && c.compareTo(queue[right]) > 0)
   1.903 +                    c = queue[child = right];
   1.904 +                if (key.compareTo(c) <= 0)
   1.905 +                    break;
   1.906 +                queue[k] = c;
   1.907 +                setIndex(c, k);
   1.908 +                k = child;
   1.909 +            }
   1.910 +            queue[k] = key;
   1.911 +            setIndex(key, k);
   1.912 +        }
   1.913 +
   1.914 +        /**
   1.915 +         * Resize the heap array.  Call only when holding lock.
   1.916 +         */
   1.917 +        private void grow() {
   1.918 +            int oldCapacity = queue.length;
   1.919 +            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
   1.920 +            if (newCapacity < 0) // overflow
   1.921 +                newCapacity = Integer.MAX_VALUE;
   1.922 +            queue = Arrays.copyOf(queue, newCapacity);
   1.923 +        }
   1.924 +
   1.925 +        /**
   1.926 +         * Find index of given object, or -1 if absent
   1.927 +         */
   1.928 +        private int indexOf(Object x) {
   1.929 +            if (x != null) {
   1.930 +                if (x instanceof ScheduledFutureTask) {
   1.931 +                    int i = ((ScheduledFutureTask) x).heapIndex;
   1.932 +                    // Sanity check; x could conceivably be a
   1.933 +                    // ScheduledFutureTask from some other pool.
   1.934 +                    if (i >= 0 && i < size && queue[i] == x)
   1.935 +                        return i;
   1.936 +                } else {
   1.937 +                    for (int i = 0; i < size; i++)
   1.938 +                        if (x.equals(queue[i]))
   1.939 +                            return i;
   1.940 +                }
   1.941 +            }
   1.942 +            return -1;
   1.943 +        }
   1.944 +
   1.945 +        public boolean contains(Object x) {
   1.946 +            final ReentrantLock lock = this.lock;
   1.947 +            lock.lock();
   1.948 +            try {
   1.949 +                return indexOf(x) != -1;
   1.950 +            } finally {
   1.951 +                lock.unlock();
   1.952 +            }
   1.953 +        }
   1.954 +
   1.955 +        public boolean remove(Object x) {
   1.956 +            final ReentrantLock lock = this.lock;
   1.957 +            lock.lock();
   1.958 +            try {
   1.959 +                int i = indexOf(x);
   1.960 +                if (i < 0)
   1.961 +                    return false;
   1.962 +
   1.963 +                setIndex(queue[i], -1);
   1.964 +                int s = --size;
   1.965 +                RunnableScheduledFuture replacement = queue[s];
   1.966 +                queue[s] = null;
   1.967 +                if (s != i) {
   1.968 +                    siftDown(i, replacement);
   1.969 +                    if (queue[i] == replacement)
   1.970 +                        siftUp(i, replacement);
   1.971 +                }
   1.972 +                return true;
   1.973 +            } finally {
   1.974 +                lock.unlock();
   1.975 +            }
   1.976 +        }
   1.977 +
   1.978 +        public int size() {
   1.979 +            final ReentrantLock lock = this.lock;
   1.980 +            lock.lock();
   1.981 +            try {
   1.982 +                return size;
   1.983 +            } finally {
   1.984 +                lock.unlock();
   1.985 +            }
   1.986 +        }
   1.987 +
   1.988 +        public boolean isEmpty() {
   1.989 +            return size() == 0;
   1.990 +        }
   1.991 +
   1.992 +        public int remainingCapacity() {
   1.993 +            return Integer.MAX_VALUE;
   1.994 +        }
   1.995 +
   1.996 +        public RunnableScheduledFuture peek() {
   1.997 +            final ReentrantLock lock = this.lock;
   1.998 +            lock.lock();
   1.999 +            try {
  1.1000 +                return queue[0];
  1.1001 +            } finally {
  1.1002 +                lock.unlock();
  1.1003 +            }
  1.1004 +        }
  1.1005 +
  1.1006 +        public boolean offer(Runnable x) {
  1.1007 +            if (x == null)
  1.1008 +                throw new NullPointerException();
  1.1009 +            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
  1.1010 +            final ReentrantLock lock = this.lock;
  1.1011 +            lock.lock();
  1.1012 +            try {
  1.1013 +                int i = size;
  1.1014 +                if (i >= queue.length)
  1.1015 +                    grow();
  1.1016 +                size = i + 1;
  1.1017 +                if (i == 0) {
  1.1018 +                    queue[0] = e;
  1.1019 +                    setIndex(e, 0);
  1.1020 +                } else {
  1.1021 +                    siftUp(i, e);
  1.1022 +                }
  1.1023 +                if (queue[0] == e) {
  1.1024 +                    leader = null;
  1.1025 +                    available.signal();
  1.1026 +                }
  1.1027 +            } finally {
  1.1028 +                lock.unlock();
  1.1029 +            }
  1.1030 +            return true;
  1.1031 +        }
  1.1032 +
  1.1033 +        public void put(Runnable e) {
  1.1034 +            offer(e);
  1.1035 +        }
  1.1036 +
  1.1037 +        public boolean add(Runnable e) {
  1.1038 +            return offer(e);
  1.1039 +        }
  1.1040 +
  1.1041 +        public boolean offer(Runnable e, long timeout, TimeUnit unit) {
  1.1042 +            return offer(e);
  1.1043 +        }
  1.1044 +
  1.1045 +        /**
  1.1046 +         * Performs common bookkeeping for poll and take: Replaces
  1.1047 +         * first element with last and sifts it down.  Call only when
  1.1048 +         * holding lock.
  1.1049 +         * @param f the task to remove and return
  1.1050 +         */
  1.1051 +        private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
  1.1052 +            int s = --size;
  1.1053 +            RunnableScheduledFuture x = queue[s];
  1.1054 +            queue[s] = null;
  1.1055 +            if (s != 0)
  1.1056 +                siftDown(0, x);
  1.1057 +            setIndex(f, -1);
  1.1058 +            return f;
  1.1059 +        }
  1.1060 +
  1.1061 +        public RunnableScheduledFuture poll() {
  1.1062 +            final ReentrantLock lock = this.lock;
  1.1063 +            lock.lock();
  1.1064 +            try {
  1.1065 +                RunnableScheduledFuture first = queue[0];
  1.1066 +                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  1.1067 +                    return null;
  1.1068 +                else
  1.1069 +                    return finishPoll(first);
  1.1070 +            } finally {
  1.1071 +                lock.unlock();
  1.1072 +            }
  1.1073 +        }
  1.1074 +
  1.1075 +        public RunnableScheduledFuture take() throws InterruptedException {
  1.1076 +            final ReentrantLock lock = this.lock;
  1.1077 +            lock.lockInterruptibly();
  1.1078 +            try {
  1.1079 +                for (;;) {
  1.1080 +                    RunnableScheduledFuture first = queue[0];
  1.1081 +                    if (first == null)
  1.1082 +                        available.await();
  1.1083 +                    else {
  1.1084 +                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
  1.1085 +                        if (delay <= 0)
  1.1086 +                            return finishPoll(first);
  1.1087 +                        else if (leader != null)
  1.1088 +                            available.await();
  1.1089 +                        else {
  1.1090 +                            Thread thisThread = Thread.currentThread();
  1.1091 +                            leader = thisThread;
  1.1092 +                            try {
  1.1093 +                                available.awaitNanos(delay);
  1.1094 +                            } finally {
  1.1095 +                                if (leader == thisThread)
  1.1096 +                                    leader = null;
  1.1097 +                            }
  1.1098 +                        }
  1.1099 +                    }
  1.1100 +                }
  1.1101 +            } finally {
  1.1102 +                if (leader == null && queue[0] != null)
  1.1103 +                    available.signal();
  1.1104 +                lock.unlock();
  1.1105 +            }
  1.1106 +        }
  1.1107 +
  1.1108 +        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
  1.1109 +            throws InterruptedException {
  1.1110 +            long nanos = unit.toNanos(timeout);
  1.1111 +            final ReentrantLock lock = this.lock;
  1.1112 +            lock.lockInterruptibly();
  1.1113 +            try {
  1.1114 +                for (;;) {
  1.1115 +                    RunnableScheduledFuture first = queue[0];
  1.1116 +                    if (first == null) {
  1.1117 +                        if (nanos <= 0)
  1.1118 +                            return null;
  1.1119 +                        else
  1.1120 +                            nanos = available.awaitNanos(nanos);
  1.1121 +                    } else {
  1.1122 +                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
  1.1123 +                        if (delay <= 0)
  1.1124 +                            return finishPoll(first);
  1.1125 +                        if (nanos <= 0)
  1.1126 +                            return null;
  1.1127 +                        if (nanos < delay || leader != null)
  1.1128 +                            nanos = available.awaitNanos(nanos);
  1.1129 +                        else {
  1.1130 +                            Thread thisThread = Thread.currentThread();
  1.1131 +                            leader = thisThread;
  1.1132 +                            try {
  1.1133 +                                long timeLeft = available.awaitNanos(delay);
  1.1134 +                                nanos -= delay - timeLeft;
  1.1135 +                            } finally {
  1.1136 +                                if (leader == thisThread)
  1.1137 +                                    leader = null;
  1.1138 +                            }
  1.1139 +                        }
  1.1140 +                    }
  1.1141 +                }
  1.1142 +            } finally {
  1.1143 +                if (leader == null && queue[0] != null)
  1.1144 +                    available.signal();
  1.1145 +                lock.unlock();
  1.1146 +            }
  1.1147 +        }
  1.1148 +
  1.1149 +        public void clear() {
  1.1150 +            final ReentrantLock lock = this.lock;
  1.1151 +            lock.lock();
  1.1152 +            try {
  1.1153 +                for (int i = 0; i < size; i++) {
  1.1154 +                    RunnableScheduledFuture t = queue[i];
  1.1155 +                    if (t != null) {
  1.1156 +                        queue[i] = null;
  1.1157 +                        setIndex(t, -1);
  1.1158 +                    }
  1.1159 +                }
  1.1160 +                size = 0;
  1.1161 +            } finally {
  1.1162 +                lock.unlock();
  1.1163 +            }
  1.1164 +        }
  1.1165 +
  1.1166 +        /**
  1.1167 +         * Return and remove first element only if it is expired.
  1.1168 +         * Used only by drainTo.  Call only when holding lock.
  1.1169 +         */
  1.1170 +        private RunnableScheduledFuture pollExpired() {
  1.1171 +            RunnableScheduledFuture first = queue[0];
  1.1172 +            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  1.1173 +                return null;
  1.1174 +            return finishPoll(first);
  1.1175 +        }
  1.1176 +
  1.1177 +        public int drainTo(Collection<? super Runnable> c) {
  1.1178 +            if (c == null)
  1.1179 +                throw new NullPointerException();
  1.1180 +            if (c == this)
  1.1181 +                throw new IllegalArgumentException();
  1.1182 +            final ReentrantLock lock = this.lock;
  1.1183 +            lock.lock();
  1.1184 +            try {
  1.1185 +                RunnableScheduledFuture first;
  1.1186 +                int n = 0;
  1.1187 +                while ((first = pollExpired()) != null) {
  1.1188 +                    c.add(first);
  1.1189 +                    ++n;
  1.1190 +                }
  1.1191 +                return n;
  1.1192 +            } finally {
  1.1193 +                lock.unlock();
  1.1194 +            }
  1.1195 +        }
  1.1196 +
  1.1197 +        public int drainTo(Collection<? super Runnable> c, int maxElements) {
  1.1198 +            if (c == null)
  1.1199 +                throw new NullPointerException();
  1.1200 +            if (c == this)
  1.1201 +                throw new IllegalArgumentException();
  1.1202 +            if (maxElements <= 0)
  1.1203 +                return 0;
  1.1204 +            final ReentrantLock lock = this.lock;
  1.1205 +            lock.lock();
  1.1206 +            try {
  1.1207 +                RunnableScheduledFuture first;
  1.1208 +                int n = 0;
  1.1209 +                while (n < maxElements && (first = pollExpired()) != null) {
  1.1210 +                    c.add(first);
  1.1211 +                    ++n;
  1.1212 +                }
  1.1213 +                return n;
  1.1214 +            } finally {
  1.1215 +                lock.unlock();
  1.1216 +            }
  1.1217 +        }
  1.1218 +
  1.1219 +        public Object[] toArray() {
  1.1220 +            final ReentrantLock lock = this.lock;
  1.1221 +            lock.lock();
  1.1222 +            try {
  1.1223 +                return Arrays.copyOf(queue, size, Object[].class);
  1.1224 +            } finally {
  1.1225 +                lock.unlock();
  1.1226 +            }
  1.1227 +        }
  1.1228 +
  1.1229 +        @SuppressWarnings("unchecked")
  1.1230 +        public <T> T[] toArray(T[] a) {
  1.1231 +            final ReentrantLock lock = this.lock;
  1.1232 +            lock.lock();
  1.1233 +            try {
  1.1234 +                if (a.length < size)
  1.1235 +                    return (T[]) Arrays.copyOf(queue, size, a.getClass());
  1.1236 +                System.arraycopy(queue, 0, a, 0, size);
  1.1237 +                if (a.length > size)
  1.1238 +                    a[size] = null;
  1.1239 +                return a;
  1.1240 +            } finally {
  1.1241 +                lock.unlock();
  1.1242 +            }
  1.1243 +        }
  1.1244 +
  1.1245 +        public Iterator<Runnable> iterator() {
  1.1246 +            return new Itr(Arrays.copyOf(queue, size));
  1.1247 +        }
  1.1248 +
  1.1249 +        /**
  1.1250 +         * Snapshot iterator that works off copy of underlying q array.
  1.1251 +         */
  1.1252 +        private class Itr implements Iterator<Runnable> {
  1.1253 +            final RunnableScheduledFuture[] array;
  1.1254 +            int cursor = 0;     // index of next element to return
  1.1255 +            int lastRet = -1;   // index of last element, or -1 if no such
  1.1256 +
  1.1257 +            Itr(RunnableScheduledFuture[] array) {
  1.1258 +                this.array = array;
  1.1259 +            }
  1.1260 +
  1.1261 +            public boolean hasNext() {
  1.1262 +                return cursor < array.length;
  1.1263 +            }
  1.1264 +
  1.1265 +            public Runnable next() {
  1.1266 +                if (cursor >= array.length)
  1.1267 +                    throw new NoSuchElementException();
  1.1268 +                lastRet = cursor;
  1.1269 +                return array[cursor++];
  1.1270 +            }
  1.1271 +
  1.1272 +            public void remove() {
  1.1273 +                if (lastRet < 0)
  1.1274 +                    throw new IllegalStateException();
  1.1275 +                DelayedWorkQueue.this.remove(array[lastRet]);
  1.1276 +                lastRet = -1;
  1.1277 +            }
  1.1278 +        }
  1.1279 +    }
  1.1280 +}