t = decorateTask(command, sft);
jaroslav@1890: sft.outerTask = t;
jaroslav@1890: delayedExecute(t);
jaroslav@1890: return t;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Executes {@code command} with zero required delay.
jaroslav@1890: * This has effect equivalent to
jaroslav@1890: * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
jaroslav@1890: * Note that inspections of the queue and of the list returned by
jaroslav@1890: * {@code shutdownNow} will access the zero-delayed
jaroslav@1890: * {@link ScheduledFuture}, not the {@code command} itself.
jaroslav@1890: *
jaroslav@1890: * A consequence of the use of {@code ScheduledFuture} objects is
jaroslav@1890: * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
jaroslav@1890: * called with a null second {@code Throwable} argument, even if the
jaroslav@1890: * {@code command} terminated abruptly. Instead, the {@code Throwable}
jaroslav@1890: * thrown by such a task can be obtained via {@link Future#get}.
jaroslav@1890: *
jaroslav@1890: * @throws RejectedExecutionException at discretion of
jaroslav@1890: * {@code RejectedExecutionHandler}, if the task
jaroslav@1890: * cannot be accepted for execution because the
jaroslav@1890: * executor has been shut down
jaroslav@1890: * @throws NullPointerException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public void execute(Runnable command) {
jaroslav@1890: schedule(command, 0, TimeUnit.NANOSECONDS);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: // Override AbstractExecutorService methods
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc}
jaroslav@1890: * @throws NullPointerException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public Future> submit(Runnable task) {
jaroslav@1890: return schedule(task, 0, TimeUnit.NANOSECONDS);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc}
jaroslav@1890: * @throws NullPointerException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public Future submit(Runnable task, T result) {
jaroslav@1890: return schedule(Executors.callable(task, result),
jaroslav@1890: 0, TimeUnit.NANOSECONDS);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * @throws RejectedExecutionException {@inheritDoc}
jaroslav@1890: * @throws NullPointerException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public Future submit(Callable task) {
jaroslav@1890: return schedule(task, 0, TimeUnit.NANOSECONDS);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Sets the policy on whether to continue executing existing
jaroslav@1890: * periodic tasks even when this executor has been {@code shutdown}.
jaroslav@1890: * In this case, these tasks will only terminate upon
jaroslav@1890: * {@code shutdownNow} or after setting the policy to
jaroslav@1890: * {@code false} when already shutdown.
jaroslav@1890: * This value is by default {@code false}.
jaroslav@1890: *
jaroslav@1890: * @param value if {@code true}, continue after shutdown, else don't.
jaroslav@1890: * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
jaroslav@1890: */
jaroslav@1890: public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
jaroslav@1890: continueExistingPeriodicTasksAfterShutdown = value;
jaroslav@1890: if (!value && isShutdown())
jaroslav@1890: onShutdown();
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Gets the policy on whether to continue executing existing
jaroslav@1890: * periodic tasks even when this executor has been {@code shutdown}.
jaroslav@1890: * In this case, these tasks will only terminate upon
jaroslav@1890: * {@code shutdownNow} or after setting the policy to
jaroslav@1890: * {@code false} when already shutdown.
jaroslav@1890: * This value is by default {@code false}.
jaroslav@1890: *
jaroslav@1890: * @return {@code true} if will continue after shutdown
jaroslav@1890: * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
jaroslav@1890: */
jaroslav@1890: public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
jaroslav@1890: return continueExistingPeriodicTasksAfterShutdown;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Sets the policy on whether to execute existing delayed
jaroslav@1890: * tasks even when this executor has been {@code shutdown}.
jaroslav@1890: * In this case, these tasks will only terminate upon
jaroslav@1890: * {@code shutdownNow}, or after setting the policy to
jaroslav@1890: * {@code false} when already shutdown.
jaroslav@1890: * This value is by default {@code true}.
jaroslav@1890: *
jaroslav@1890: * @param value if {@code true}, execute after shutdown, else don't.
jaroslav@1890: * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
jaroslav@1890: */
jaroslav@1890: public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
jaroslav@1890: executeExistingDelayedTasksAfterShutdown = value;
jaroslav@1890: if (!value && isShutdown())
jaroslav@1890: onShutdown();
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Gets the policy on whether to execute existing delayed
jaroslav@1890: * tasks even when this executor has been {@code shutdown}.
jaroslav@1890: * In this case, these tasks will only terminate upon
jaroslav@1890: * {@code shutdownNow}, or after setting the policy to
jaroslav@1890: * {@code false} when already shutdown.
jaroslav@1890: * This value is by default {@code true}.
jaroslav@1890: *
jaroslav@1890: * @return {@code true} if will execute after shutdown
jaroslav@1890: * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
jaroslav@1890: */
jaroslav@1890: public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
jaroslav@1890: return executeExistingDelayedTasksAfterShutdown;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Sets the policy on whether cancelled tasks should be immediately
jaroslav@1890: * removed from the work queue at time of cancellation. This value is
jaroslav@1890: * by default {@code false}.
jaroslav@1890: *
jaroslav@1890: * @param value if {@code true}, remove on cancellation, else don't
jaroslav@1890: * @see #getRemoveOnCancelPolicy
jaroslav@1890: * @since 1.7
jaroslav@1890: */
jaroslav@1890: public void setRemoveOnCancelPolicy(boolean value) {
jaroslav@1890: removeOnCancel = value;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Gets the policy on whether cancelled tasks should be immediately
jaroslav@1890: * removed from the work queue at time of cancellation. This value is
jaroslav@1890: * by default {@code false}.
jaroslav@1890: *
jaroslav@1890: * @return {@code true} if cancelled tasks are immediately removed
jaroslav@1890: * from the queue
jaroslav@1890: * @see #setRemoveOnCancelPolicy
jaroslav@1890: * @since 1.7
jaroslav@1890: */
jaroslav@1890: public boolean getRemoveOnCancelPolicy() {
jaroslav@1890: return removeOnCancel;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Initiates an orderly shutdown in which previously submitted
jaroslav@1890: * tasks are executed, but no new tasks will be accepted.
jaroslav@1890: * Invocation has no additional effect if already shut down.
jaroslav@1890: *
jaroslav@1890: * This method does not wait for previously submitted tasks to
jaroslav@1890: * complete execution. Use {@link #awaitTermination awaitTermination}
jaroslav@1890: * to do that.
jaroslav@1890: *
jaroslav@1890: *
If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
jaroslav@1890: * has been set {@code false}, existing delayed tasks whose delays
jaroslav@1890: * have not yet elapsed are cancelled. And unless the {@code
jaroslav@1890: * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
jaroslav@1890: * {@code true}, future executions of existing periodic tasks will
jaroslav@1890: * be cancelled.
jaroslav@1890: *
jaroslav@1890: * @throws SecurityException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public void shutdown() {
jaroslav@1890: super.shutdown();
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Attempts to stop all actively executing tasks, halts the
jaroslav@1890: * processing of waiting tasks, and returns a list of the tasks
jaroslav@1890: * that were awaiting execution.
jaroslav@1890: *
jaroslav@1890: *
This method does not wait for actively executing tasks to
jaroslav@1890: * terminate. Use {@link #awaitTermination awaitTermination} to
jaroslav@1890: * do that.
jaroslav@1890: *
jaroslav@1890: *
There are no guarantees beyond best-effort attempts to stop
jaroslav@1890: * processing actively executing tasks. This implementation
jaroslav@1890: * cancels tasks via {@link Thread#interrupt}, so any task that
jaroslav@1890: * fails to respond to interrupts may never terminate.
jaroslav@1890: *
jaroslav@1890: * @return list of tasks that never commenced execution.
jaroslav@1890: * Each element of this list is a {@link ScheduledFuture},
jaroslav@1890: * including those tasks submitted using {@code execute},
jaroslav@1890: * which are for scheduling purposes used as the basis of a
jaroslav@1890: * zero-delay {@code ScheduledFuture}.
jaroslav@1890: * @throws SecurityException {@inheritDoc}
jaroslav@1890: */
jaroslav@1890: public List shutdownNow() {
jaroslav@1890: return super.shutdownNow();
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Returns the task queue used by this executor. Each element of
jaroslav@1890: * this queue is a {@link ScheduledFuture}, including those
jaroslav@1890: * tasks submitted using {@code execute} which are for scheduling
jaroslav@1890: * purposes used as the basis of a zero-delay
jaroslav@1890: * {@code ScheduledFuture}. Iteration over this queue is
jaroslav@1890: * not guaranteed to traverse tasks in the order in
jaroslav@1890: * which they will execute.
jaroslav@1890: *
jaroslav@1890: * @return the task queue
jaroslav@1890: */
jaroslav@1890: public BlockingQueue getQueue() {
jaroslav@1890: return super.getQueue();
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Specialized delay queue. To mesh with TPE declarations, this
jaroslav@1890: * class must be declared as a BlockingQueue even though
jaroslav@1890: * it can only hold RunnableScheduledFutures.
jaroslav@1890: */
jaroslav@1890: static class DelayedWorkQueue extends AbstractQueue
jaroslav@1890: implements BlockingQueue {
jaroslav@1890:
jaroslav@1890: /*
jaroslav@1890: * A DelayedWorkQueue is based on a heap-based data structure
jaroslav@1890: * like those in DelayQueue and PriorityQueue, except that
jaroslav@1890: * every ScheduledFutureTask also records its index into the
jaroslav@1890: * heap array. This eliminates the need to find a task upon
jaroslav@1890: * cancellation, greatly speeding up removal (down from O(n)
jaroslav@1890: * to O(log n)), and reducing garbage retention that would
jaroslav@1890: * otherwise occur by waiting for the element to rise to top
jaroslav@1890: * before clearing. But because the queue may also hold
jaroslav@1890: * RunnableScheduledFutures that are not ScheduledFutureTasks,
jaroslav@1890: * we are not guaranteed to have such indices available, in
jaroslav@1890: * which case we fall back to linear search. (We expect that
jaroslav@1890: * most tasks will not be decorated, and that the faster cases
jaroslav@1890: * will be much more common.)
jaroslav@1890: *
jaroslav@1890: * All heap operations must record index changes -- mainly
jaroslav@1890: * within siftUp and siftDown. Upon removal, a task's
jaroslav@1890: * heapIndex is set to -1. Note that ScheduledFutureTasks can
jaroslav@1890: * appear at most once in the queue (this need not be true for
jaroslav@1890: * other kinds of tasks or work queues), so are uniquely
jaroslav@1890: * identified by heapIndex.
jaroslav@1890: */
jaroslav@1890:
jaroslav@1890: private static final int INITIAL_CAPACITY = 16;
jaroslav@1890: private RunnableScheduledFuture[] queue =
jaroslav@1890: new RunnableScheduledFuture[INITIAL_CAPACITY];
jaroslav@1890: private final ReentrantLock lock = new ReentrantLock();
jaroslav@1890: private int size = 0;
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Thread designated to wait for the task at the head of the
jaroslav@1890: * queue. This variant of the Leader-Follower pattern
jaroslav@1890: * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
jaroslav@1890: * minimize unnecessary timed waiting. When a thread becomes
jaroslav@1890: * the leader, it waits only for the next delay to elapse, but
jaroslav@1890: * other threads await indefinitely. The leader thread must
jaroslav@1890: * signal some other thread before returning from take() or
jaroslav@1890: * poll(...), unless some other thread becomes leader in the
jaroslav@1890: * interim. Whenever the head of the queue is replaced with a
jaroslav@1890: * task with an earlier expiration time, the leader field is
jaroslav@1890: * invalidated by being reset to null, and some waiting
jaroslav@1890: * thread, but not necessarily the current leader, is
jaroslav@1890: * signalled. So waiting threads must be prepared to acquire
jaroslav@1890: * and lose leadership while waiting.
jaroslav@1890: */
jaroslav@1890: private Thread leader = null;
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Condition signalled when a newer task becomes available at the
jaroslav@1890: * head of the queue or a new thread may need to become leader.
jaroslav@1890: */
jaroslav@1890: private final Condition available = lock.newCondition();
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Set f's heapIndex if it is a ScheduledFutureTask.
jaroslav@1890: */
jaroslav@1890: private void setIndex(RunnableScheduledFuture f, int idx) {
jaroslav@1890: if (f instanceof ScheduledFutureTask)
jaroslav@1890: ((ScheduledFutureTask)f).heapIndex = idx;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Sift element added at bottom up to its heap-ordered spot.
jaroslav@1890: * Call only when holding lock.
jaroslav@1890: */
jaroslav@1890: private void siftUp(int k, RunnableScheduledFuture key) {
jaroslav@1890: while (k > 0) {
jaroslav@1890: int parent = (k - 1) >>> 1;
jaroslav@1890: RunnableScheduledFuture e = queue[parent];
jaroslav@1890: if (key.compareTo(e) >= 0)
jaroslav@1890: break;
jaroslav@1890: queue[k] = e;
jaroslav@1890: setIndex(e, k);
jaroslav@1890: k = parent;
jaroslav@1890: }
jaroslav@1890: queue[k] = key;
jaroslav@1890: setIndex(key, k);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Sift element added at top down to its heap-ordered spot.
jaroslav@1890: * Call only when holding lock.
jaroslav@1890: */
jaroslav@1890: private void siftDown(int k, RunnableScheduledFuture key) {
jaroslav@1890: int half = size >>> 1;
jaroslav@1890: while (k < half) {
jaroslav@1890: int child = (k << 1) + 1;
jaroslav@1890: RunnableScheduledFuture c = queue[child];
jaroslav@1890: int right = child + 1;
jaroslav@1890: if (right < size && c.compareTo(queue[right]) > 0)
jaroslav@1890: c = queue[child = right];
jaroslav@1890: if (key.compareTo(c) <= 0)
jaroslav@1890: break;
jaroslav@1890: queue[k] = c;
jaroslav@1890: setIndex(c, k);
jaroslav@1890: k = child;
jaroslav@1890: }
jaroslav@1890: queue[k] = key;
jaroslav@1890: setIndex(key, k);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Resize the heap array. Call only when holding lock.
jaroslav@1890: */
jaroslav@1890: private void grow() {
jaroslav@1890: int oldCapacity = queue.length;
jaroslav@1890: int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
jaroslav@1890: if (newCapacity < 0) // overflow
jaroslav@1890: newCapacity = Integer.MAX_VALUE;
jaroslav@1890: queue = Arrays.copyOf(queue, newCapacity);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Find index of given object, or -1 if absent
jaroslav@1890: */
jaroslav@1890: private int indexOf(Object x) {
jaroslav@1890: if (x != null) {
jaroslav@1890: if (x instanceof ScheduledFutureTask) {
jaroslav@1890: int i = ((ScheduledFutureTask) x).heapIndex;
jaroslav@1890: // Sanity check; x could conceivably be a
jaroslav@1890: // ScheduledFutureTask from some other pool.
jaroslav@1890: if (i >= 0 && i < size && queue[i] == x)
jaroslav@1890: return i;
jaroslav@1890: } else {
jaroslav@1890: for (int i = 0; i < size; i++)
jaroslav@1890: if (x.equals(queue[i]))
jaroslav@1890: return i;
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: return -1;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean contains(Object x) {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: return indexOf(x) != -1;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean remove(Object x) {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: int i = indexOf(x);
jaroslav@1890: if (i < 0)
jaroslav@1890: return false;
jaroslav@1890:
jaroslav@1890: setIndex(queue[i], -1);
jaroslav@1890: int s = --size;
jaroslav@1890: RunnableScheduledFuture replacement = queue[s];
jaroslav@1890: queue[s] = null;
jaroslav@1890: if (s != i) {
jaroslav@1890: siftDown(i, replacement);
jaroslav@1890: if (queue[i] == replacement)
jaroslav@1890: siftUp(i, replacement);
jaroslav@1890: }
jaroslav@1890: return true;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public int size() {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: return size;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean isEmpty() {
jaroslav@1890: return size() == 0;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public int remainingCapacity() {
jaroslav@1890: return Integer.MAX_VALUE;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public RunnableScheduledFuture peek() {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: return queue[0];
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean offer(Runnable x) {
jaroslav@1890: if (x == null)
jaroslav@1890: throw new NullPointerException();
jaroslav@1890: RunnableScheduledFuture e = (RunnableScheduledFuture)x;
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: int i = size;
jaroslav@1890: if (i >= queue.length)
jaroslav@1890: grow();
jaroslav@1890: size = i + 1;
jaroslav@1890: if (i == 0) {
jaroslav@1890: queue[0] = e;
jaroslav@1890: setIndex(e, 0);
jaroslav@1890: } else {
jaroslav@1890: siftUp(i, e);
jaroslav@1890: }
jaroslav@1890: if (queue[0] == e) {
jaroslav@1890: leader = null;
jaroslav@1890: available.signal();
jaroslav@1890: }
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: return true;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public void put(Runnable e) {
jaroslav@1890: offer(e);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean add(Runnable e) {
jaroslav@1890: return offer(e);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean offer(Runnable e, long timeout, TimeUnit unit) {
jaroslav@1890: return offer(e);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Performs common bookkeeping for poll and take: Replaces
jaroslav@1890: * first element with last and sifts it down. Call only when
jaroslav@1890: * holding lock.
jaroslav@1890: * @param f the task to remove and return
jaroslav@1890: */
jaroslav@1890: private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
jaroslav@1890: int s = --size;
jaroslav@1890: RunnableScheduledFuture x = queue[s];
jaroslav@1890: queue[s] = null;
jaroslav@1890: if (s != 0)
jaroslav@1890: siftDown(0, x);
jaroslav@1890: setIndex(f, -1);
jaroslav@1890: return f;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public RunnableScheduledFuture poll() {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: RunnableScheduledFuture first = queue[0];
jaroslav@1890: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
jaroslav@1890: return null;
jaroslav@1890: else
jaroslav@1890: return finishPoll(first);
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public RunnableScheduledFuture take() throws InterruptedException {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lockInterruptibly();
jaroslav@1890: try {
jaroslav@1890: for (;;) {
jaroslav@1890: RunnableScheduledFuture first = queue[0];
jaroslav@1890: if (first == null)
jaroslav@1890: available.await();
jaroslav@1890: else {
jaroslav@1890: long delay = first.getDelay(TimeUnit.NANOSECONDS);
jaroslav@1890: if (delay <= 0)
jaroslav@1890: return finishPoll(first);
jaroslav@1890: else if (leader != null)
jaroslav@1890: available.await();
jaroslav@1890: else {
jaroslav@1890: Thread thisThread = Thread.currentThread();
jaroslav@1890: leader = thisThread;
jaroslav@1890: try {
jaroslav@1890: available.awaitNanos(delay);
jaroslav@1890: } finally {
jaroslav@1890: if (leader == thisThread)
jaroslav@1890: leader = null;
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: } finally {
jaroslav@1890: if (leader == null && queue[0] != null)
jaroslav@1890: available.signal();
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
jaroslav@1890: throws InterruptedException {
jaroslav@1890: long nanos = unit.toNanos(timeout);
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lockInterruptibly();
jaroslav@1890: try {
jaroslav@1890: for (;;) {
jaroslav@1890: RunnableScheduledFuture first = queue[0];
jaroslav@1890: if (first == null) {
jaroslav@1890: if (nanos <= 0)
jaroslav@1890: return null;
jaroslav@1890: else
jaroslav@1890: nanos = available.awaitNanos(nanos);
jaroslav@1890: } else {
jaroslav@1890: long delay = first.getDelay(TimeUnit.NANOSECONDS);
jaroslav@1890: if (delay <= 0)
jaroslav@1890: return finishPoll(first);
jaroslav@1890: if (nanos <= 0)
jaroslav@1890: return null;
jaroslav@1890: if (nanos < delay || leader != null)
jaroslav@1890: nanos = available.awaitNanos(nanos);
jaroslav@1890: else {
jaroslav@1890: Thread thisThread = Thread.currentThread();
jaroslav@1890: leader = thisThread;
jaroslav@1890: try {
jaroslav@1890: long timeLeft = available.awaitNanos(delay);
jaroslav@1890: nanos -= delay - timeLeft;
jaroslav@1890: } finally {
jaroslav@1890: if (leader == thisThread)
jaroslav@1890: leader = null;
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: } finally {
jaroslav@1890: if (leader == null && queue[0] != null)
jaroslav@1890: available.signal();
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public void clear() {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: for (int i = 0; i < size; i++) {
jaroslav@1890: RunnableScheduledFuture t = queue[i];
jaroslav@1890: if (t != null) {
jaroslav@1890: queue[i] = null;
jaroslav@1890: setIndex(t, -1);
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: size = 0;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Return and remove first element only if it is expired.
jaroslav@1890: * Used only by drainTo. Call only when holding lock.
jaroslav@1890: */
jaroslav@1890: private RunnableScheduledFuture pollExpired() {
jaroslav@1890: RunnableScheduledFuture first = queue[0];
jaroslav@1890: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
jaroslav@1890: return null;
jaroslav@1890: return finishPoll(first);
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public int drainTo(Collection super Runnable> c) {
jaroslav@1890: if (c == null)
jaroslav@1890: throw new NullPointerException();
jaroslav@1890: if (c == this)
jaroslav@1890: throw new IllegalArgumentException();
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: RunnableScheduledFuture first;
jaroslav@1890: int n = 0;
jaroslav@1890: while ((first = pollExpired()) != null) {
jaroslav@1890: c.add(first);
jaroslav@1890: ++n;
jaroslav@1890: }
jaroslav@1890: return n;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public int drainTo(Collection super Runnable> c, int maxElements) {
jaroslav@1890: if (c == null)
jaroslav@1890: throw new NullPointerException();
jaroslav@1890: if (c == this)
jaroslav@1890: throw new IllegalArgumentException();
jaroslav@1890: if (maxElements <= 0)
jaroslav@1890: return 0;
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: RunnableScheduledFuture first;
jaroslav@1890: int n = 0;
jaroslav@1890: while (n < maxElements && (first = pollExpired()) != null) {
jaroslav@1890: c.add(first);
jaroslav@1890: ++n;
jaroslav@1890: }
jaroslav@1890: return n;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public Object[] toArray() {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: return Arrays.copyOf(queue, size, Object[].class);
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: @SuppressWarnings("unchecked")
jaroslav@1890: public T[] toArray(T[] a) {
jaroslav@1890: final ReentrantLock lock = this.lock;
jaroslav@1890: lock.lock();
jaroslav@1890: try {
jaroslav@1890: if (a.length < size)
jaroslav@1890: return (T[]) Arrays.copyOf(queue, size, a.getClass());
jaroslav@1890: System.arraycopy(queue, 0, a, 0, size);
jaroslav@1890: if (a.length > size)
jaroslav@1890: a[size] = null;
jaroslav@1890: return a;
jaroslav@1890: } finally {
jaroslav@1890: lock.unlock();
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public Iterator iterator() {
jaroslav@1890: return new Itr(Arrays.copyOf(queue, size));
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: /**
jaroslav@1890: * Snapshot iterator that works off copy of underlying q array.
jaroslav@1890: */
jaroslav@1890: private class Itr implements Iterator {
jaroslav@1890: final RunnableScheduledFuture[] array;
jaroslav@1890: int cursor = 0; // index of next element to return
jaroslav@1890: int lastRet = -1; // index of last element, or -1 if no such
jaroslav@1890:
jaroslav@1890: Itr(RunnableScheduledFuture[] array) {
jaroslav@1890: this.array = array;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public boolean hasNext() {
jaroslav@1890: return cursor < array.length;
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public Runnable next() {
jaroslav@1890: if (cursor >= array.length)
jaroslav@1890: throw new NoSuchElementException();
jaroslav@1890: lastRet = cursor;
jaroslav@1890: return array[cursor++];
jaroslav@1890: }
jaroslav@1890:
jaroslav@1890: public void remove() {
jaroslav@1890: if (lastRet < 0)
jaroslav@1890: throw new IllegalStateException();
jaroslav@1890: DelayedWorkQueue.this.remove(array[lastRet]);
jaroslav@1890: lastRet = -1;
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }
jaroslav@1890: }