rt/emul/compact/src/main/java/java/util/concurrent/ScheduledThreadPoolExecutor.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 import java.util.concurrent.atomic.*;
    38 import java.util.concurrent.locks.*;
    39 import java.util.*;
    40 
    41 /**
    42  * A {@link ThreadPoolExecutor} that can additionally schedule
    43  * commands to run after a given delay, or to execute
    44  * periodically. This class is preferable to {@link java.util.Timer}
    45  * when multiple worker threads are needed, or when the additional
    46  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
    47  * this class extends) are required.
    48  *
    49  * <p>Delayed tasks execute no sooner than they are enabled, but
    50  * without any real-time guarantees about when, after they are
    51  * enabled, they will commence. Tasks scheduled for exactly the same
    52  * execution time are enabled in first-in-first-out (FIFO) order of
    53  * submission.
    54  *
    55  * <p>When a submitted task is cancelled before it is run, execution
    56  * is suppressed. By default, such a cancelled task is not
    57  * automatically removed from the work queue until its delay
    58  * elapses. While this enables further inspection and monitoring, it
    59  * may also cause unbounded retention of cancelled tasks. To avoid
    60  * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
    61  * causes tasks to be immediately removed from the work queue at
    62  * time of cancellation.
    63  *
    64  * <p>Successive executions of a task scheduled via
    65  * {@code scheduleAtFixedRate} or
    66  * {@code scheduleWithFixedDelay} do not overlap. While different
    67  * executions may be performed by different threads, the effects of
    68  * prior executions <a
    69  * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
    70  * those of subsequent ones.
    71  *
    72  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
    73  * of the inherited tuning methods are not useful for it. In
    74  * particular, because it acts as a fixed-sized pool using
    75  * {@code corePoolSize} threads and an unbounded queue, adjustments
    76  * to {@code maximumPoolSize} have no useful effect. Additionally, it
    77  * is almost never a good idea to set {@code corePoolSize} to zero or
    78  * use {@code allowCoreThreadTimeOut} because this may leave the pool
    79  * without threads to handle tasks once they become eligible to run.
    80  *
    81  * <p><b>Extension notes:</b> This class overrides the
    82  * {@link ThreadPoolExecutor#execute execute} and
    83  * {@link AbstractExecutorService#submit(Runnable) submit}
    84  * methods to generate internal {@link ScheduledFuture} objects to
    85  * control per-task delays and scheduling.  To preserve
    86  * functionality, any further overrides of these methods in
    87  * subclasses must invoke superclass versions, which effectively
    88  * disables additional task customization.  However, this class
    89  * provides alternative protected extension method
    90  * {@code decorateTask} (one version each for {@code Runnable} and
    91  * {@code Callable}) that can be used to customize the concrete task
    92  * types used to execute commands entered via {@code execute},
    93  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
    94  * and {@code scheduleWithFixedDelay}.  By default, a
    95  * {@code ScheduledThreadPoolExecutor} uses a task type extending
    96  * {@link FutureTask}. However, this may be modified or replaced using
    97  * subclasses of the form:
    98  *
    99  *  <pre> {@code
   100  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
   101  *
   102  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
   103  *
   104  *   protected <V> RunnableScheduledFuture<V> decorateTask(
   105  *                Runnable r, RunnableScheduledFuture<V> task) {
   106  *       return new CustomTask<V>(r, task);
   107  *   }
   108  *
   109  *   protected <V> RunnableScheduledFuture<V> decorateTask(
   110  *                Callable<V> c, RunnableScheduledFuture<V> task) {
   111  *       return new CustomTask<V>(c, task);
   112  *   }
   113  *   // ... add constructors, etc.
   114  * }}</pre>
   115  *
   116  * @since 1.5
   117  * @author Doug Lea
   118  */
   119 public class ScheduledThreadPoolExecutor
   120         extends ThreadPoolExecutor
   121         implements ScheduledExecutorService {
   122 
   123     /*
   124      * This class specializes ThreadPoolExecutor implementation by
   125      *
   126      * 1. Using a custom task type, ScheduledFutureTask for
   127      *    tasks, even those that don't require scheduling (i.e.,
   128      *    those submitted using ExecutorService execute, not
   129      *    ScheduledExecutorService methods) which are treated as
   130      *    delayed tasks with a delay of zero.
   131      *
   132      * 2. Using a custom queue (DelayedWorkQueue), a variant of
   133      *    unbounded DelayQueue. The lack of capacity constraint and
   134      *    the fact that corePoolSize and maximumPoolSize are
   135      *    effectively identical simplifies some execution mechanics
   136      *    (see delayedExecute) compared to ThreadPoolExecutor.
   137      *
   138      * 3. Supporting optional run-after-shutdown parameters, which
   139      *    leads to overrides of shutdown methods to remove and cancel
   140      *    tasks that should NOT be run after shutdown, as well as
   141      *    different recheck logic when task (re)submission overlaps
   142      *    with a shutdown.
   143      *
   144      * 4. Task decoration methods to allow interception and
   145      *    instrumentation, which are needed because subclasses cannot
   146      *    otherwise override submit methods to get this effect. These
   147      *    don't have any impact on pool control logic though.
   148      */
   149 
   150     /**
   151      * False if should cancel/suppress periodic tasks on shutdown.
   152      */
   153     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
   154 
   155     /**
   156      * False if should cancel non-periodic tasks on shutdown.
   157      */
   158     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
   159 
   160     /**
   161      * True if ScheduledFutureTask.cancel should remove from queue
   162      */
   163     private volatile boolean removeOnCancel = false;
   164 
   165     /**
   166      * Sequence number to break scheduling ties, and in turn to
   167      * guarantee FIFO order among tied entries.
   168      */
   169     private static final AtomicLong sequencer = new AtomicLong(0);
   170 
   171     /**
   172      * Returns current nanosecond time.
   173      */
   174     final long now() {
   175         return System.nanoTime();
   176     }
   177 
   178     private class ScheduledFutureTask<V>
   179             extends FutureTask<V> implements RunnableScheduledFuture<V> {
   180 
   181         /** Sequence number to break ties FIFO */
   182         private final long sequenceNumber;
   183 
   184         /** The time the task is enabled to execute in nanoTime units */
   185         private long time;
   186 
   187         /**
   188          * Period in nanoseconds for repeating tasks.  A positive
   189          * value indicates fixed-rate execution.  A negative value
   190          * indicates fixed-delay execution.  A value of 0 indicates a
   191          * non-repeating task.
   192          */
   193         private final long period;
   194 
   195         /** The actual task to be re-enqueued by reExecutePeriodic */
   196         RunnableScheduledFuture<V> outerTask = this;
   197 
   198         /**
   199          * Index into delay queue, to support faster cancellation.
   200          */
   201         int heapIndex;
   202 
   203         /**
   204          * Creates a one-shot action with given nanoTime-based trigger time.
   205          */
   206         ScheduledFutureTask(Runnable r, V result, long ns) {
   207             super(r, result);
   208             this.time = ns;
   209             this.period = 0;
   210             this.sequenceNumber = sequencer.getAndIncrement();
   211         }
   212 
   213         /**
   214          * Creates a periodic action with given nano time and period.
   215          */
   216         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
   217             super(r, result);
   218             this.time = ns;
   219             this.period = period;
   220             this.sequenceNumber = sequencer.getAndIncrement();
   221         }
   222 
   223         /**
   224          * Creates a one-shot action with given nanoTime-based trigger.
   225          */
   226         ScheduledFutureTask(Callable<V> callable, long ns) {
   227             super(callable);
   228             this.time = ns;
   229             this.period = 0;
   230             this.sequenceNumber = sequencer.getAndIncrement();
   231         }
   232 
   233         public long getDelay(TimeUnit unit) {
   234             return unit.convert(time - now(), TimeUnit.NANOSECONDS);
   235         }
   236 
   237         public int compareTo(Delayed other) {
   238             if (other == this) // compare zero ONLY if same object
   239                 return 0;
   240             if (other instanceof ScheduledFutureTask) {
   241                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
   242                 long diff = time - x.time;
   243                 if (diff < 0)
   244                     return -1;
   245                 else if (diff > 0)
   246                     return 1;
   247                 else if (sequenceNumber < x.sequenceNumber)
   248                     return -1;
   249                 else
   250                     return 1;
   251             }
   252             long d = (getDelay(TimeUnit.NANOSECONDS) -
   253                       other.getDelay(TimeUnit.NANOSECONDS));
   254             return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
   255         }
   256 
   257         /**
   258          * Returns true if this is a periodic (not a one-shot) action.
   259          *
   260          * @return true if periodic
   261          */
   262         public boolean isPeriodic() {
   263             return period != 0;
   264         }
   265 
   266         /**
   267          * Sets the next time to run for a periodic task.
   268          */
   269         private void setNextRunTime() {
   270             long p = period;
   271             if (p > 0)
   272                 time += p;
   273             else
   274                 time = triggerTime(-p);
   275         }
   276 
   277         public boolean cancel(boolean mayInterruptIfRunning) {
   278             boolean cancelled = super.cancel(mayInterruptIfRunning);
   279             if (cancelled && removeOnCancel && heapIndex >= 0)
   280                 remove(this);
   281             return cancelled;
   282         }
   283 
   284         /**
   285          * Overrides FutureTask version so as to reset/requeue if periodic.
   286          */
   287         public void run() {
   288             boolean periodic = isPeriodic();
   289             if (!canRunInCurrentRunState(periodic))
   290                 cancel(false);
   291             else if (!periodic)
   292                 ScheduledFutureTask.super.run();
   293             else if (ScheduledFutureTask.super.runAndReset()) {
   294                 setNextRunTime();
   295                 reExecutePeriodic(outerTask);
   296             }
   297         }
   298     }
   299 
   300     /**
   301      * Returns true if can run a task given current run state
   302      * and run-after-shutdown parameters.
   303      *
   304      * @param periodic true if this task periodic, false if delayed
   305      */
   306     boolean canRunInCurrentRunState(boolean periodic) {
   307         return isRunningOrShutdown(periodic ?
   308                                    continueExistingPeriodicTasksAfterShutdown :
   309                                    executeExistingDelayedTasksAfterShutdown);
   310     }
   311 
   312     /**
   313      * Main execution method for delayed or periodic tasks.  If pool
   314      * is shut down, rejects the task. Otherwise adds task to queue
   315      * and starts a thread, if necessary, to run it.  (We cannot
   316      * prestart the thread to run the task because the task (probably)
   317      * shouldn't be run yet,) If the pool is shut down while the task
   318      * is being added, cancel and remove it if required by state and
   319      * run-after-shutdown parameters.
   320      *
   321      * @param task the task
   322      */
   323     private void delayedExecute(RunnableScheduledFuture<?> task) {
   324         if (isShutdown())
   325             reject(task);
   326         else {
   327             super.getQueue().add(task);
   328             if (isShutdown() &&
   329                 !canRunInCurrentRunState(task.isPeriodic()) &&
   330                 remove(task))
   331                 task.cancel(false);
   332             else
   333                 prestartCoreThread();
   334         }
   335     }
   336 
   337     /**
   338      * Requeues a periodic task unless current run state precludes it.
   339      * Same idea as delayedExecute except drops task rather than rejecting.
   340      *
   341      * @param task the task
   342      */
   343     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
   344         if (canRunInCurrentRunState(true)) {
   345             super.getQueue().add(task);
   346             if (!canRunInCurrentRunState(true) && remove(task))
   347                 task.cancel(false);
   348             else
   349                 prestartCoreThread();
   350         }
   351     }
   352 
   353     /**
   354      * Cancels and clears the queue of all tasks that should not be run
   355      * due to shutdown policy.  Invoked within super.shutdown.
   356      */
   357     @Override void onShutdown() {
   358         BlockingQueue<Runnable> q = super.getQueue();
   359         boolean keepDelayed =
   360             getExecuteExistingDelayedTasksAfterShutdownPolicy();
   361         boolean keepPeriodic =
   362             getContinueExistingPeriodicTasksAfterShutdownPolicy();
   363         if (!keepDelayed && !keepPeriodic) {
   364             for (Object e : q.toArray())
   365                 if (e instanceof RunnableScheduledFuture<?>)
   366                     ((RunnableScheduledFuture<?>) e).cancel(false);
   367             q.clear();
   368         }
   369         else {
   370             // Traverse snapshot to avoid iterator exceptions
   371             for (Object e : q.toArray()) {
   372                 if (e instanceof RunnableScheduledFuture) {
   373                     RunnableScheduledFuture<?> t =
   374                         (RunnableScheduledFuture<?>)e;
   375                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
   376                         t.isCancelled()) { // also remove if already cancelled
   377                         if (q.remove(t))
   378                             t.cancel(false);
   379                     }
   380                 }
   381             }
   382         }
   383         tryTerminate();
   384     }
   385 
   386     /**
   387      * Modifies or replaces the task used to execute a runnable.
   388      * This method can be used to override the concrete
   389      * class used for managing internal tasks.
   390      * The default implementation simply returns the given task.
   391      *
   392      * @param runnable the submitted Runnable
   393      * @param task the task created to execute the runnable
   394      * @return a task that can execute the runnable
   395      * @since 1.6
   396      */
   397     protected <V> RunnableScheduledFuture<V> decorateTask(
   398         Runnable runnable, RunnableScheduledFuture<V> task) {
   399         return task;
   400     }
   401 
   402     /**
   403      * Modifies or replaces the task used to execute a callable.
   404      * This method can be used to override the concrete
   405      * class used for managing internal tasks.
   406      * The default implementation simply returns the given task.
   407      *
   408      * @param callable the submitted Callable
   409      * @param task the task created to execute the callable
   410      * @return a task that can execute the callable
   411      * @since 1.6
   412      */
   413     protected <V> RunnableScheduledFuture<V> decorateTask(
   414         Callable<V> callable, RunnableScheduledFuture<V> task) {
   415         return task;
   416     }
   417 
   418     /**
   419      * Creates a new {@code ScheduledThreadPoolExecutor} with the
   420      * given core pool size.
   421      *
   422      * @param corePoolSize the number of threads to keep in the pool, even
   423      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   424      * @throws IllegalArgumentException if {@code corePoolSize < 0}
   425      */
   426     public ScheduledThreadPoolExecutor(int corePoolSize) {
   427         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   428               new DelayedWorkQueue());
   429     }
   430 
   431     /**
   432      * Creates a new {@code ScheduledThreadPoolExecutor} with the
   433      * given initial parameters.
   434      *
   435      * @param corePoolSize the number of threads to keep in the pool, even
   436      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   437      * @param threadFactory the factory to use when the executor
   438      *        creates a new thread
   439      * @throws IllegalArgumentException if {@code corePoolSize < 0}
   440      * @throws NullPointerException if {@code threadFactory} is null
   441      */
   442     public ScheduledThreadPoolExecutor(int corePoolSize,
   443                                        ThreadFactory threadFactory) {
   444         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   445               new DelayedWorkQueue(), threadFactory);
   446     }
   447 
   448     /**
   449      * Creates a new ScheduledThreadPoolExecutor with the given
   450      * initial parameters.
   451      *
   452      * @param corePoolSize the number of threads to keep in the pool, even
   453      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   454      * @param handler the handler to use when execution is blocked
   455      *        because the thread bounds and queue capacities are reached
   456      * @throws IllegalArgumentException if {@code corePoolSize < 0}
   457      * @throws NullPointerException if {@code handler} is null
   458      */
   459     public ScheduledThreadPoolExecutor(int corePoolSize,
   460                                        RejectedExecutionHandler handler) {
   461         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   462               new DelayedWorkQueue(), handler);
   463     }
   464 
   465     /**
   466      * Creates a new ScheduledThreadPoolExecutor with the given
   467      * initial parameters.
   468      *
   469      * @param corePoolSize the number of threads to keep in the pool, even
   470      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   471      * @param threadFactory the factory to use when the executor
   472      *        creates a new thread
   473      * @param handler the handler to use when execution is blocked
   474      *        because the thread bounds and queue capacities are reached
   475      * @throws IllegalArgumentException if {@code corePoolSize < 0}
   476      * @throws NullPointerException if {@code threadFactory} or
   477      *         {@code handler} is null
   478      */
   479     public ScheduledThreadPoolExecutor(int corePoolSize,
   480                                        ThreadFactory threadFactory,
   481                                        RejectedExecutionHandler handler) {
   482         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
   483               new DelayedWorkQueue(), threadFactory, handler);
   484     }
   485 
   486     /**
   487      * Returns the trigger time of a delayed action.
   488      */
   489     private long triggerTime(long delay, TimeUnit unit) {
   490         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
   491     }
   492 
   493     /**
   494      * Returns the trigger time of a delayed action.
   495      */
   496     long triggerTime(long delay) {
   497         return now() +
   498             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
   499     }
   500 
   501     /**
   502      * Constrains the values of all delays in the queue to be within
   503      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
   504      * This may occur if a task is eligible to be dequeued, but has
   505      * not yet been, while some other task is added with a delay of
   506      * Long.MAX_VALUE.
   507      */
   508     private long overflowFree(long delay) {
   509         Delayed head = (Delayed) super.getQueue().peek();
   510         if (head != null) {
   511             long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
   512             if (headDelay < 0 && (delay - headDelay < 0))
   513                 delay = Long.MAX_VALUE + headDelay;
   514         }
   515         return delay;
   516     }
   517 
   518     /**
   519      * @throws RejectedExecutionException {@inheritDoc}
   520      * @throws NullPointerException       {@inheritDoc}
   521      */
   522     public ScheduledFuture<?> schedule(Runnable command,
   523                                        long delay,
   524                                        TimeUnit unit) {
   525         if (command == null || unit == null)
   526             throw new NullPointerException();
   527         RunnableScheduledFuture<?> t = decorateTask(command,
   528             new ScheduledFutureTask<Void>(command, null,
   529                                           triggerTime(delay, unit)));
   530         delayedExecute(t);
   531         return t;
   532     }
   533 
   534     /**
   535      * @throws RejectedExecutionException {@inheritDoc}
   536      * @throws NullPointerException       {@inheritDoc}
   537      */
   538     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
   539                                            long delay,
   540                                            TimeUnit unit) {
   541         if (callable == null || unit == null)
   542             throw new NullPointerException();
   543         RunnableScheduledFuture<V> t = decorateTask(callable,
   544             new ScheduledFutureTask<V>(callable,
   545                                        triggerTime(delay, unit)));
   546         delayedExecute(t);
   547         return t;
   548     }
   549 
   550     /**
   551      * @throws RejectedExecutionException {@inheritDoc}
   552      * @throws NullPointerException       {@inheritDoc}
   553      * @throws IllegalArgumentException   {@inheritDoc}
   554      */
   555     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
   556                                                   long initialDelay,
   557                                                   long period,
   558                                                   TimeUnit unit) {
   559         if (command == null || unit == null)
   560             throw new NullPointerException();
   561         if (period <= 0)
   562             throw new IllegalArgumentException();
   563         ScheduledFutureTask<Void> sft =
   564             new ScheduledFutureTask<Void>(command,
   565                                           null,
   566                                           triggerTime(initialDelay, unit),
   567                                           unit.toNanos(period));
   568         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   569         sft.outerTask = t;
   570         delayedExecute(t);
   571         return t;
   572     }
   573 
   574     /**
   575      * @throws RejectedExecutionException {@inheritDoc}
   576      * @throws NullPointerException       {@inheritDoc}
   577      * @throws IllegalArgumentException   {@inheritDoc}
   578      */
   579     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
   580                                                      long initialDelay,
   581                                                      long delay,
   582                                                      TimeUnit unit) {
   583         if (command == null || unit == null)
   584             throw new NullPointerException();
   585         if (delay <= 0)
   586             throw new IllegalArgumentException();
   587         ScheduledFutureTask<Void> sft =
   588             new ScheduledFutureTask<Void>(command,
   589                                           null,
   590                                           triggerTime(initialDelay, unit),
   591                                           unit.toNanos(-delay));
   592         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   593         sft.outerTask = t;
   594         delayedExecute(t);
   595         return t;
   596     }
   597 
   598     /**
   599      * Executes {@code command} with zero required delay.
   600      * This has effect equivalent to
   601      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
   602      * Note that inspections of the queue and of the list returned by
   603      * {@code shutdownNow} will access the zero-delayed
   604      * {@link ScheduledFuture}, not the {@code command} itself.
   605      *
   606      * <p>A consequence of the use of {@code ScheduledFuture} objects is
   607      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
   608      * called with a null second {@code Throwable} argument, even if the
   609      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
   610      * thrown by such a task can be obtained via {@link Future#get}.
   611      *
   612      * @throws RejectedExecutionException at discretion of
   613      *         {@code RejectedExecutionHandler}, if the task
   614      *         cannot be accepted for execution because the
   615      *         executor has been shut down
   616      * @throws NullPointerException {@inheritDoc}
   617      */
   618     public void execute(Runnable command) {
   619         schedule(command, 0, TimeUnit.NANOSECONDS);
   620     }
   621 
   622     // Override AbstractExecutorService methods
   623 
   624     /**
   625      * @throws RejectedExecutionException {@inheritDoc}
   626      * @throws NullPointerException       {@inheritDoc}
   627      */
   628     public Future<?> submit(Runnable task) {
   629         return schedule(task, 0, TimeUnit.NANOSECONDS);
   630     }
   631 
   632     /**
   633      * @throws RejectedExecutionException {@inheritDoc}
   634      * @throws NullPointerException       {@inheritDoc}
   635      */
   636     public <T> Future<T> submit(Runnable task, T result) {
   637         return schedule(Executors.callable(task, result),
   638                         0, TimeUnit.NANOSECONDS);
   639     }
   640 
   641     /**
   642      * @throws RejectedExecutionException {@inheritDoc}
   643      * @throws NullPointerException       {@inheritDoc}
   644      */
   645     public <T> Future<T> submit(Callable<T> task) {
   646         return schedule(task, 0, TimeUnit.NANOSECONDS);
   647     }
   648 
   649     /**
   650      * Sets the policy on whether to continue executing existing
   651      * periodic tasks even when this executor has been {@code shutdown}.
   652      * In this case, these tasks will only terminate upon
   653      * {@code shutdownNow} or after setting the policy to
   654      * {@code false} when already shutdown.
   655      * This value is by default {@code false}.
   656      *
   657      * @param value if {@code true}, continue after shutdown, else don't.
   658      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
   659      */
   660     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
   661         continueExistingPeriodicTasksAfterShutdown = value;
   662         if (!value && isShutdown())
   663             onShutdown();
   664     }
   665 
   666     /**
   667      * Gets the policy on whether to continue executing existing
   668      * periodic tasks even when this executor has been {@code shutdown}.
   669      * In this case, these tasks will only terminate upon
   670      * {@code shutdownNow} or after setting the policy to
   671      * {@code false} when already shutdown.
   672      * This value is by default {@code false}.
   673      *
   674      * @return {@code true} if will continue after shutdown
   675      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
   676      */
   677     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
   678         return continueExistingPeriodicTasksAfterShutdown;
   679     }
   680 
   681     /**
   682      * Sets the policy on whether to execute existing delayed
   683      * tasks even when this executor has been {@code shutdown}.
   684      * In this case, these tasks will only terminate upon
   685      * {@code shutdownNow}, or after setting the policy to
   686      * {@code false} when already shutdown.
   687      * This value is by default {@code true}.
   688      *
   689      * @param value if {@code true}, execute after shutdown, else don't.
   690      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
   691      */
   692     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
   693         executeExistingDelayedTasksAfterShutdown = value;
   694         if (!value && isShutdown())
   695             onShutdown();
   696     }
   697 
   698     /**
   699      * Gets the policy on whether to execute existing delayed
   700      * tasks even when this executor has been {@code shutdown}.
   701      * In this case, these tasks will only terminate upon
   702      * {@code shutdownNow}, or after setting the policy to
   703      * {@code false} when already shutdown.
   704      * This value is by default {@code true}.
   705      *
   706      * @return {@code true} if will execute after shutdown
   707      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
   708      */
   709     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
   710         return executeExistingDelayedTasksAfterShutdown;
   711     }
   712 
   713     /**
   714      * Sets the policy on whether cancelled tasks should be immediately
   715      * removed from the work queue at time of cancellation.  This value is
   716      * by default {@code false}.
   717      *
   718      * @param value if {@code true}, remove on cancellation, else don't
   719      * @see #getRemoveOnCancelPolicy
   720      * @since 1.7
   721      */
   722     public void setRemoveOnCancelPolicy(boolean value) {
   723         removeOnCancel = value;
   724     }
   725 
   726     /**
   727      * Gets the policy on whether cancelled tasks should be immediately
   728      * removed from the work queue at time of cancellation.  This value is
   729      * by default {@code false}.
   730      *
   731      * @return {@code true} if cancelled tasks are immediately removed
   732      *         from the queue
   733      * @see #setRemoveOnCancelPolicy
   734      * @since 1.7
   735      */
   736     public boolean getRemoveOnCancelPolicy() {
   737         return removeOnCancel;
   738     }
   739 
   740     /**
   741      * Initiates an orderly shutdown in which previously submitted
   742      * tasks are executed, but no new tasks will be accepted.
   743      * Invocation has no additional effect if already shut down.
   744      *
   745      * <p>This method does not wait for previously submitted tasks to
   746      * complete execution.  Use {@link #awaitTermination awaitTermination}
   747      * to do that.
   748      *
   749      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
   750      * has been set {@code false}, existing delayed tasks whose delays
   751      * have not yet elapsed are cancelled.  And unless the {@code
   752      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
   753      * {@code true}, future executions of existing periodic tasks will
   754      * be cancelled.
   755      *
   756      * @throws SecurityException {@inheritDoc}
   757      */
   758     public void shutdown() {
   759         super.shutdown();
   760     }
   761 
   762     /**
   763      * Attempts to stop all actively executing tasks, halts the
   764      * processing of waiting tasks, and returns a list of the tasks
   765      * that were awaiting execution.
   766      *
   767      * <p>This method does not wait for actively executing tasks to
   768      * terminate.  Use {@link #awaitTermination awaitTermination} to
   769      * do that.
   770      *
   771      * <p>There are no guarantees beyond best-effort attempts to stop
   772      * processing actively executing tasks.  This implementation
   773      * cancels tasks via {@link Thread#interrupt}, so any task that
   774      * fails to respond to interrupts may never terminate.
   775      *
   776      * @return list of tasks that never commenced execution.
   777      *         Each element of this list is a {@link ScheduledFuture},
   778      *         including those tasks submitted using {@code execute},
   779      *         which are for scheduling purposes used as the basis of a
   780      *         zero-delay {@code ScheduledFuture}.
   781      * @throws SecurityException {@inheritDoc}
   782      */
   783     public List<Runnable> shutdownNow() {
   784         return super.shutdownNow();
   785     }
   786 
   787     /**
   788      * Returns the task queue used by this executor.  Each element of
   789      * this queue is a {@link ScheduledFuture}, including those
   790      * tasks submitted using {@code execute} which are for scheduling
   791      * purposes used as the basis of a zero-delay
   792      * {@code ScheduledFuture}.  Iteration over this queue is
   793      * <em>not</em> guaranteed to traverse tasks in the order in
   794      * which they will execute.
   795      *
   796      * @return the task queue
   797      */
   798     public BlockingQueue<Runnable> getQueue() {
   799         return super.getQueue();
   800     }
   801 
   802     /**
   803      * Specialized delay queue. To mesh with TPE declarations, this
   804      * class must be declared as a BlockingQueue<Runnable> even though
   805      * it can only hold RunnableScheduledFutures.
   806      */
   807     static class DelayedWorkQueue extends AbstractQueue<Runnable>
   808         implements BlockingQueue<Runnable> {
   809 
   810         /*
   811          * A DelayedWorkQueue is based on a heap-based data structure
   812          * like those in DelayQueue and PriorityQueue, except that
   813          * every ScheduledFutureTask also records its index into the
   814          * heap array. This eliminates the need to find a task upon
   815          * cancellation, greatly speeding up removal (down from O(n)
   816          * to O(log n)), and reducing garbage retention that would
   817          * otherwise occur by waiting for the element to rise to top
   818          * before clearing. But because the queue may also hold
   819          * RunnableScheduledFutures that are not ScheduledFutureTasks,
   820          * we are not guaranteed to have such indices available, in
   821          * which case we fall back to linear search. (We expect that
   822          * most tasks will not be decorated, and that the faster cases
   823          * will be much more common.)
   824          *
   825          * All heap operations must record index changes -- mainly
   826          * within siftUp and siftDown. Upon removal, a task's
   827          * heapIndex is set to -1. Note that ScheduledFutureTasks can
   828          * appear at most once in the queue (this need not be true for
   829          * other kinds of tasks or work queues), so are uniquely
   830          * identified by heapIndex.
   831          */
   832 
   833         private static final int INITIAL_CAPACITY = 16;
   834         private RunnableScheduledFuture[] queue =
   835             new RunnableScheduledFuture[INITIAL_CAPACITY];
   836         private final ReentrantLock lock = new ReentrantLock();
   837         private int size = 0;
   838 
   839         /**
   840          * Thread designated to wait for the task at the head of the
   841          * queue.  This variant of the Leader-Follower pattern
   842          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
   843          * minimize unnecessary timed waiting.  When a thread becomes
   844          * the leader, it waits only for the next delay to elapse, but
   845          * other threads await indefinitely.  The leader thread must
   846          * signal some other thread before returning from take() or
   847          * poll(...), unless some other thread becomes leader in the
   848          * interim.  Whenever the head of the queue is replaced with a
   849          * task with an earlier expiration time, the leader field is
   850          * invalidated by being reset to null, and some waiting
   851          * thread, but not necessarily the current leader, is
   852          * signalled.  So waiting threads must be prepared to acquire
   853          * and lose leadership while waiting.
   854          */
   855         private Thread leader = null;
   856 
   857         /**
   858          * Condition signalled when a newer task becomes available at the
   859          * head of the queue or a new thread may need to become leader.
   860          */
   861         private final Condition available = lock.newCondition();
   862 
   863         /**
   864          * Set f's heapIndex if it is a ScheduledFutureTask.
   865          */
   866         private void setIndex(RunnableScheduledFuture f, int idx) {
   867             if (f instanceof ScheduledFutureTask)
   868                 ((ScheduledFutureTask)f).heapIndex = idx;
   869         }
   870 
   871         /**
   872          * Sift element added at bottom up to its heap-ordered spot.
   873          * Call only when holding lock.
   874          */
   875         private void siftUp(int k, RunnableScheduledFuture key) {
   876             while (k > 0) {
   877                 int parent = (k - 1) >>> 1;
   878                 RunnableScheduledFuture e = queue[parent];
   879                 if (key.compareTo(e) >= 0)
   880                     break;
   881                 queue[k] = e;
   882                 setIndex(e, k);
   883                 k = parent;
   884             }
   885             queue[k] = key;
   886             setIndex(key, k);
   887         }
   888 
   889         /**
   890          * Sift element added at top down to its heap-ordered spot.
   891          * Call only when holding lock.
   892          */
   893         private void siftDown(int k, RunnableScheduledFuture key) {
   894             int half = size >>> 1;
   895             while (k < half) {
   896                 int child = (k << 1) + 1;
   897                 RunnableScheduledFuture c = queue[child];
   898                 int right = child + 1;
   899                 if (right < size && c.compareTo(queue[right]) > 0)
   900                     c = queue[child = right];
   901                 if (key.compareTo(c) <= 0)
   902                     break;
   903                 queue[k] = c;
   904                 setIndex(c, k);
   905                 k = child;
   906             }
   907             queue[k] = key;
   908             setIndex(key, k);
   909         }
   910 
   911         /**
   912          * Resize the heap array.  Call only when holding lock.
   913          */
   914         private void grow() {
   915             int oldCapacity = queue.length;
   916             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
   917             if (newCapacity < 0) // overflow
   918                 newCapacity = Integer.MAX_VALUE;
   919             queue = Arrays.copyOf(queue, newCapacity);
   920         }
   921 
   922         /**
   923          * Find index of given object, or -1 if absent
   924          */
   925         private int indexOf(Object x) {
   926             if (x != null) {
   927                 if (x instanceof ScheduledFutureTask) {
   928                     int i = ((ScheduledFutureTask) x).heapIndex;
   929                     // Sanity check; x could conceivably be a
   930                     // ScheduledFutureTask from some other pool.
   931                     if (i >= 0 && i < size && queue[i] == x)
   932                         return i;
   933                 } else {
   934                     for (int i = 0; i < size; i++)
   935                         if (x.equals(queue[i]))
   936                             return i;
   937                 }
   938             }
   939             return -1;
   940         }
   941 
   942         public boolean contains(Object x) {
   943             final ReentrantLock lock = this.lock;
   944             lock.lock();
   945             try {
   946                 return indexOf(x) != -1;
   947             } finally {
   948                 lock.unlock();
   949             }
   950         }
   951 
   952         public boolean remove(Object x) {
   953             final ReentrantLock lock = this.lock;
   954             lock.lock();
   955             try {
   956                 int i = indexOf(x);
   957                 if (i < 0)
   958                     return false;
   959 
   960                 setIndex(queue[i], -1);
   961                 int s = --size;
   962                 RunnableScheduledFuture replacement = queue[s];
   963                 queue[s] = null;
   964                 if (s != i) {
   965                     siftDown(i, replacement);
   966                     if (queue[i] == replacement)
   967                         siftUp(i, replacement);
   968                 }
   969                 return true;
   970             } finally {
   971                 lock.unlock();
   972             }
   973         }
   974 
   975         public int size() {
   976             final ReentrantLock lock = this.lock;
   977             lock.lock();
   978             try {
   979                 return size;
   980             } finally {
   981                 lock.unlock();
   982             }
   983         }
   984 
   985         public boolean isEmpty() {
   986             return size() == 0;
   987         }
   988 
   989         public int remainingCapacity() {
   990             return Integer.MAX_VALUE;
   991         }
   992 
   993         public RunnableScheduledFuture peek() {
   994             final ReentrantLock lock = this.lock;
   995             lock.lock();
   996             try {
   997                 return queue[0];
   998             } finally {
   999                 lock.unlock();
  1000             }
  1001         }
  1002 
  1003         public boolean offer(Runnable x) {
  1004             if (x == null)
  1005                 throw new NullPointerException();
  1006             RunnableScheduledFuture e = (RunnableScheduledFuture)x;
  1007             final ReentrantLock lock = this.lock;
  1008             lock.lock();
  1009             try {
  1010                 int i = size;
  1011                 if (i >= queue.length)
  1012                     grow();
  1013                 size = i + 1;
  1014                 if (i == 0) {
  1015                     queue[0] = e;
  1016                     setIndex(e, 0);
  1017                 } else {
  1018                     siftUp(i, e);
  1019                 }
  1020                 if (queue[0] == e) {
  1021                     leader = null;
  1022                     available.signal();
  1023                 }
  1024             } finally {
  1025                 lock.unlock();
  1026             }
  1027             return true;
  1028         }
  1029 
  1030         public void put(Runnable e) {
  1031             offer(e);
  1032         }
  1033 
  1034         public boolean add(Runnable e) {
  1035             return offer(e);
  1036         }
  1037 
  1038         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
  1039             return offer(e);
  1040         }
  1041 
  1042         /**
  1043          * Performs common bookkeeping for poll and take: Replaces
  1044          * first element with last and sifts it down.  Call only when
  1045          * holding lock.
  1046          * @param f the task to remove and return
  1047          */
  1048         private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
  1049             int s = --size;
  1050             RunnableScheduledFuture x = queue[s];
  1051             queue[s] = null;
  1052             if (s != 0)
  1053                 siftDown(0, x);
  1054             setIndex(f, -1);
  1055             return f;
  1056         }
  1057 
  1058         public RunnableScheduledFuture poll() {
  1059             final ReentrantLock lock = this.lock;
  1060             lock.lock();
  1061             try {
  1062                 RunnableScheduledFuture first = queue[0];
  1063                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  1064                     return null;
  1065                 else
  1066                     return finishPoll(first);
  1067             } finally {
  1068                 lock.unlock();
  1069             }
  1070         }
  1071 
  1072         public RunnableScheduledFuture take() throws InterruptedException {
  1073             final ReentrantLock lock = this.lock;
  1074             lock.lockInterruptibly();
  1075             try {
  1076                 for (;;) {
  1077                     RunnableScheduledFuture first = queue[0];
  1078                     if (first == null)
  1079                         available.await();
  1080                     else {
  1081                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
  1082                         if (delay <= 0)
  1083                             return finishPoll(first);
  1084                         else if (leader != null)
  1085                             available.await();
  1086                         else {
  1087                             Thread thisThread = Thread.currentThread();
  1088                             leader = thisThread;
  1089                             try {
  1090                                 available.awaitNanos(delay);
  1091                             } finally {
  1092                                 if (leader == thisThread)
  1093                                     leader = null;
  1094                             }
  1095                         }
  1096                     }
  1097                 }
  1098             } finally {
  1099                 if (leader == null && queue[0] != null)
  1100                     available.signal();
  1101                 lock.unlock();
  1102             }
  1103         }
  1104 
  1105         public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
  1106             throws InterruptedException {
  1107             long nanos = unit.toNanos(timeout);
  1108             final ReentrantLock lock = this.lock;
  1109             lock.lockInterruptibly();
  1110             try {
  1111                 for (;;) {
  1112                     RunnableScheduledFuture first = queue[0];
  1113                     if (first == null) {
  1114                         if (nanos <= 0)
  1115                             return null;
  1116                         else
  1117                             nanos = available.awaitNanos(nanos);
  1118                     } else {
  1119                         long delay = first.getDelay(TimeUnit.NANOSECONDS);
  1120                         if (delay <= 0)
  1121                             return finishPoll(first);
  1122                         if (nanos <= 0)
  1123                             return null;
  1124                         if (nanos < delay || leader != null)
  1125                             nanos = available.awaitNanos(nanos);
  1126                         else {
  1127                             Thread thisThread = Thread.currentThread();
  1128                             leader = thisThread;
  1129                             try {
  1130                                 long timeLeft = available.awaitNanos(delay);
  1131                                 nanos -= delay - timeLeft;
  1132                             } finally {
  1133                                 if (leader == thisThread)
  1134                                     leader = null;
  1135                             }
  1136                         }
  1137                     }
  1138                 }
  1139             } finally {
  1140                 if (leader == null && queue[0] != null)
  1141                     available.signal();
  1142                 lock.unlock();
  1143             }
  1144         }
  1145 
  1146         public void clear() {
  1147             final ReentrantLock lock = this.lock;
  1148             lock.lock();
  1149             try {
  1150                 for (int i = 0; i < size; i++) {
  1151                     RunnableScheduledFuture t = queue[i];
  1152                     if (t != null) {
  1153                         queue[i] = null;
  1154                         setIndex(t, -1);
  1155                     }
  1156                 }
  1157                 size = 0;
  1158             } finally {
  1159                 lock.unlock();
  1160             }
  1161         }
  1162 
  1163         /**
  1164          * Return and remove first element only if it is expired.
  1165          * Used only by drainTo.  Call only when holding lock.
  1166          */
  1167         private RunnableScheduledFuture pollExpired() {
  1168             RunnableScheduledFuture first = queue[0];
  1169             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
  1170                 return null;
  1171             return finishPoll(first);
  1172         }
  1173 
  1174         public int drainTo(Collection<? super Runnable> c) {
  1175             if (c == null)
  1176                 throw new NullPointerException();
  1177             if (c == this)
  1178                 throw new IllegalArgumentException();
  1179             final ReentrantLock lock = this.lock;
  1180             lock.lock();
  1181             try {
  1182                 RunnableScheduledFuture first;
  1183                 int n = 0;
  1184                 while ((first = pollExpired()) != null) {
  1185                     c.add(first);
  1186                     ++n;
  1187                 }
  1188                 return n;
  1189             } finally {
  1190                 lock.unlock();
  1191             }
  1192         }
  1193 
  1194         public int drainTo(Collection<? super Runnable> c, int maxElements) {
  1195             if (c == null)
  1196                 throw new NullPointerException();
  1197             if (c == this)
  1198                 throw new IllegalArgumentException();
  1199             if (maxElements <= 0)
  1200                 return 0;
  1201             final ReentrantLock lock = this.lock;
  1202             lock.lock();
  1203             try {
  1204                 RunnableScheduledFuture first;
  1205                 int n = 0;
  1206                 while (n < maxElements && (first = pollExpired()) != null) {
  1207                     c.add(first);
  1208                     ++n;
  1209                 }
  1210                 return n;
  1211             } finally {
  1212                 lock.unlock();
  1213             }
  1214         }
  1215 
  1216         public Object[] toArray() {
  1217             final ReentrantLock lock = this.lock;
  1218             lock.lock();
  1219             try {
  1220                 return Arrays.copyOf(queue, size, Object[].class);
  1221             } finally {
  1222                 lock.unlock();
  1223             }
  1224         }
  1225 
  1226         @SuppressWarnings("unchecked")
  1227         public <T> T[] toArray(T[] a) {
  1228             final ReentrantLock lock = this.lock;
  1229             lock.lock();
  1230             try {
  1231                 if (a.length < size)
  1232                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
  1233                 System.arraycopy(queue, 0, a, 0, size);
  1234                 if (a.length > size)
  1235                     a[size] = null;
  1236                 return a;
  1237             } finally {
  1238                 lock.unlock();
  1239             }
  1240         }
  1241 
  1242         public Iterator<Runnable> iterator() {
  1243             return new Itr(Arrays.copyOf(queue, size));
  1244         }
  1245 
  1246         /**
  1247          * Snapshot iterator that works off copy of underlying q array.
  1248          */
  1249         private class Itr implements Iterator<Runnable> {
  1250             final RunnableScheduledFuture[] array;
  1251             int cursor = 0;     // index of next element to return
  1252             int lastRet = -1;   // index of last element, or -1 if no such
  1253 
  1254             Itr(RunnableScheduledFuture[] array) {
  1255                 this.array = array;
  1256             }
  1257 
  1258             public boolean hasNext() {
  1259                 return cursor < array.length;
  1260             }
  1261 
  1262             public Runnable next() {
  1263                 if (cursor >= array.length)
  1264                     throw new NoSuchElementException();
  1265                 lastRet = cursor;
  1266                 return array[cursor++];
  1267             }
  1268 
  1269             public void remove() {
  1270                 if (lastRet < 0)
  1271                     throw new IllegalStateException();
  1272                 DelayedWorkQueue.this.remove(array[lastRet]);
  1273                 lastRet = -1;
  1274             }
  1275         }
  1276     }
  1277 }