1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/DelayQueue.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,546 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +
1.40 +package java.util.concurrent;
1.41 +import java.util.concurrent.locks.*;
1.42 +import java.util.*;
1.43 +
1.44 +/**
1.45 + * An unbounded {@linkplain BlockingQueue blocking queue} of
1.46 + * <tt>Delayed</tt> elements, in which an element can only be taken
1.47 + * when its delay has expired. The <em>head</em> of the queue is that
1.48 + * <tt>Delayed</tt> element whose delay expired furthest in the
1.49 + * past. If no delay has expired there is no head and <tt>poll</tt>
1.50 + * will return <tt>null</tt>. Expiration occurs when an element's
1.51 + * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
1.52 + * than or equal to zero. Even though unexpired elements cannot be
1.53 + * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
1.54 + * treated as normal elements. For example, the <tt>size</tt> method
1.55 + * returns the count of both expired and unexpired elements.
1.56 + * This queue does not permit null elements.
1.57 + *
1.58 + * <p>This class and its iterator implement all of the
1.59 + * <em>optional</em> methods of the {@link Collection} and {@link
1.60 + * Iterator} interfaces.
1.61 + *
1.62 + * <p>This class is a member of the
1.63 + * <a href="{@docRoot}/../technotes/guides/collections/index.html">
1.64 + * Java Collections Framework</a>.
1.65 + *
1.66 + * @since 1.5
1.67 + * @author Doug Lea
1.68 + * @param <E> the type of elements held in this collection
1.69 + */
1.70 +
1.71 +public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
1.72 + implements BlockingQueue<E> {
1.73 +
1.74 + private transient final ReentrantLock lock = new ReentrantLock();
1.75 + private final PriorityQueue<E> q = new PriorityQueue<E>();
1.76 +
1.77 + /**
1.78 + * Thread designated to wait for the element at the head of
1.79 + * the queue. This variant of the Leader-Follower pattern
1.80 + * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
1.81 + * minimize unnecessary timed waiting. When a thread becomes
1.82 + * the leader, it waits only for the next delay to elapse, but
1.83 + * other threads await indefinitely. The leader thread must
1.84 + * signal some other thread before returning from take() or
1.85 + * poll(...), unless some other thread becomes leader in the
1.86 + * interim. Whenever the head of the queue is replaced with
1.87 + * an element with an earlier expiration time, the leader
1.88 + * field is invalidated by being reset to null, and some
1.89 + * waiting thread, but not necessarily the current leader, is
1.90 + * signalled. So waiting threads must be prepared to acquire
1.91 + * and lose leadership while waiting.
1.92 + */
1.93 + private Thread leader = null;
1.94 +
1.95 + /**
1.96 + * Condition signalled when a newer element becomes available
1.97 + * at the head of the queue or a new thread may need to
1.98 + * become leader.
1.99 + */
1.100 + private final Condition available = lock.newCondition();
1.101 +
1.102 + /**
1.103 + * Creates a new <tt>DelayQueue</tt> that is initially empty.
1.104 + */
1.105 + public DelayQueue() {}
1.106 +
1.107 + /**
1.108 + * Creates a <tt>DelayQueue</tt> initially containing the elements of the
1.109 + * given collection of {@link Delayed} instances.
1.110 + *
1.111 + * @param c the collection of elements to initially contain
1.112 + * @throws NullPointerException if the specified collection or any
1.113 + * of its elements are null
1.114 + */
1.115 + public DelayQueue(Collection<? extends E> c) {
1.116 + this.addAll(c);
1.117 + }
1.118 +
1.119 + /**
1.120 + * Inserts the specified element into this delay queue.
1.121 + *
1.122 + * @param e the element to add
1.123 + * @return <tt>true</tt> (as specified by {@link Collection#add})
1.124 + * @throws NullPointerException if the specified element is null
1.125 + */
1.126 + public boolean add(E e) {
1.127 + return offer(e);
1.128 + }
1.129 +
1.130 + /**
1.131 + * Inserts the specified element into this delay queue.
1.132 + *
1.133 + * @param e the element to add
1.134 + * @return <tt>true</tt>
1.135 + * @throws NullPointerException if the specified element is null
1.136 + */
1.137 + public boolean offer(E e) {
1.138 + final ReentrantLock lock = this.lock;
1.139 + lock.lock();
1.140 + try {
1.141 + q.offer(e);
1.142 + if (q.peek() == e) {
1.143 + leader = null;
1.144 + available.signal();
1.145 + }
1.146 + return true;
1.147 + } finally {
1.148 + lock.unlock();
1.149 + }
1.150 + }
1.151 +
1.152 + /**
1.153 + * Inserts the specified element into this delay queue. As the queue is
1.154 + * unbounded this method will never block.
1.155 + *
1.156 + * @param e the element to add
1.157 + * @throws NullPointerException {@inheritDoc}
1.158 + */
1.159 + public void put(E e) {
1.160 + offer(e);
1.161 + }
1.162 +
1.163 + /**
1.164 + * Inserts the specified element into this delay queue. As the queue is
1.165 + * unbounded this method will never block.
1.166 + *
1.167 + * @param e the element to add
1.168 + * @param timeout This parameter is ignored as the method never blocks
1.169 + * @param unit This parameter is ignored as the method never blocks
1.170 + * @return <tt>true</tt>
1.171 + * @throws NullPointerException {@inheritDoc}
1.172 + */
1.173 + public boolean offer(E e, long timeout, TimeUnit unit) {
1.174 + return offer(e);
1.175 + }
1.176 +
1.177 + /**
1.178 + * Retrieves and removes the head of this queue, or returns <tt>null</tt>
1.179 + * if this queue has no elements with an expired delay.
1.180 + *
1.181 + * @return the head of this queue, or <tt>null</tt> if this
1.182 + * queue has no elements with an expired delay
1.183 + */
1.184 + public E poll() {
1.185 + final ReentrantLock lock = this.lock;
1.186 + lock.lock();
1.187 + try {
1.188 + E first = q.peek();
1.189 + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1.190 + return null;
1.191 + else
1.192 + return q.poll();
1.193 + } finally {
1.194 + lock.unlock();
1.195 + }
1.196 + }
1.197 +
1.198 + /**
1.199 + * Retrieves and removes the head of this queue, waiting if necessary
1.200 + * until an element with an expired delay is available on this queue.
1.201 + *
1.202 + * @return the head of this queue
1.203 + * @throws InterruptedException {@inheritDoc}
1.204 + */
1.205 + public E take() throws InterruptedException {
1.206 + final ReentrantLock lock = this.lock;
1.207 + lock.lockInterruptibly();
1.208 + try {
1.209 + for (;;) {
1.210 + E first = q.peek();
1.211 + if (first == null)
1.212 + available.await();
1.213 + else {
1.214 + long delay = first.getDelay(TimeUnit.NANOSECONDS);
1.215 + if (delay <= 0)
1.216 + return q.poll();
1.217 + else if (leader != null)
1.218 + available.await();
1.219 + else {
1.220 + Thread thisThread = Thread.currentThread();
1.221 + leader = thisThread;
1.222 + try {
1.223 + available.awaitNanos(delay);
1.224 + } finally {
1.225 + if (leader == thisThread)
1.226 + leader = null;
1.227 + }
1.228 + }
1.229 + }
1.230 + }
1.231 + } finally {
1.232 + if (leader == null && q.peek() != null)
1.233 + available.signal();
1.234 + lock.unlock();
1.235 + }
1.236 + }
1.237 +
1.238 + /**
1.239 + * Retrieves and removes the head of this queue, waiting if necessary
1.240 + * until an element with an expired delay is available on this queue,
1.241 + * or the specified wait time expires.
1.242 + *
1.243 + * @return the head of this queue, or <tt>null</tt> if the
1.244 + * specified waiting time elapses before an element with
1.245 + * an expired delay becomes available
1.246 + * @throws InterruptedException {@inheritDoc}
1.247 + */
1.248 + public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1.249 + long nanos = unit.toNanos(timeout);
1.250 + final ReentrantLock lock = this.lock;
1.251 + lock.lockInterruptibly();
1.252 + try {
1.253 + for (;;) {
1.254 + E first = q.peek();
1.255 + if (first == null) {
1.256 + if (nanos <= 0)
1.257 + return null;
1.258 + else
1.259 + nanos = available.awaitNanos(nanos);
1.260 + } else {
1.261 + long delay = first.getDelay(TimeUnit.NANOSECONDS);
1.262 + if (delay <= 0)
1.263 + return q.poll();
1.264 + if (nanos <= 0)
1.265 + return null;
1.266 + if (nanos < delay || leader != null)
1.267 + nanos = available.awaitNanos(nanos);
1.268 + else {
1.269 + Thread thisThread = Thread.currentThread();
1.270 + leader = thisThread;
1.271 + try {
1.272 + long timeLeft = available.awaitNanos(delay);
1.273 + nanos -= delay - timeLeft;
1.274 + } finally {
1.275 + if (leader == thisThread)
1.276 + leader = null;
1.277 + }
1.278 + }
1.279 + }
1.280 + }
1.281 + } finally {
1.282 + if (leader == null && q.peek() != null)
1.283 + available.signal();
1.284 + lock.unlock();
1.285 + }
1.286 + }
1.287 +
1.288 + /**
1.289 + * Retrieves, but does not remove, the head of this queue, or
1.290 + * returns <tt>null</tt> if this queue is empty. Unlike
1.291 + * <tt>poll</tt>, if no expired elements are available in the queue,
1.292 + * this method returns the element that will expire next,
1.293 + * if one exists.
1.294 + *
1.295 + * @return the head of this queue, or <tt>null</tt> if this
1.296 + * queue is empty.
1.297 + */
1.298 + public E peek() {
1.299 + final ReentrantLock lock = this.lock;
1.300 + lock.lock();
1.301 + try {
1.302 + return q.peek();
1.303 + } finally {
1.304 + lock.unlock();
1.305 + }
1.306 + }
1.307 +
1.308 + public int size() {
1.309 + final ReentrantLock lock = this.lock;
1.310 + lock.lock();
1.311 + try {
1.312 + return q.size();
1.313 + } finally {
1.314 + lock.unlock();
1.315 + }
1.316 + }
1.317 +
1.318 + /**
1.319 + * @throws UnsupportedOperationException {@inheritDoc}
1.320 + * @throws ClassCastException {@inheritDoc}
1.321 + * @throws NullPointerException {@inheritDoc}
1.322 + * @throws IllegalArgumentException {@inheritDoc}
1.323 + */
1.324 + public int drainTo(Collection<? super E> c) {
1.325 + if (c == null)
1.326 + throw new NullPointerException();
1.327 + if (c == this)
1.328 + throw new IllegalArgumentException();
1.329 + final ReentrantLock lock = this.lock;
1.330 + lock.lock();
1.331 + try {
1.332 + int n = 0;
1.333 + for (;;) {
1.334 + E first = q.peek();
1.335 + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1.336 + break;
1.337 + c.add(q.poll());
1.338 + ++n;
1.339 + }
1.340 + return n;
1.341 + } finally {
1.342 + lock.unlock();
1.343 + }
1.344 + }
1.345 +
1.346 + /**
1.347 + * @throws UnsupportedOperationException {@inheritDoc}
1.348 + * @throws ClassCastException {@inheritDoc}
1.349 + * @throws NullPointerException {@inheritDoc}
1.350 + * @throws IllegalArgumentException {@inheritDoc}
1.351 + */
1.352 + public int drainTo(Collection<? super E> c, int maxElements) {
1.353 + if (c == null)
1.354 + throw new NullPointerException();
1.355 + if (c == this)
1.356 + throw new IllegalArgumentException();
1.357 + if (maxElements <= 0)
1.358 + return 0;
1.359 + final ReentrantLock lock = this.lock;
1.360 + lock.lock();
1.361 + try {
1.362 + int n = 0;
1.363 + while (n < maxElements) {
1.364 + E first = q.peek();
1.365 + if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1.366 + break;
1.367 + c.add(q.poll());
1.368 + ++n;
1.369 + }
1.370 + return n;
1.371 + } finally {
1.372 + lock.unlock();
1.373 + }
1.374 + }
1.375 +
1.376 + /**
1.377 + * Atomically removes all of the elements from this delay queue.
1.378 + * The queue will be empty after this call returns.
1.379 + * Elements with an unexpired delay are not waited for; they are
1.380 + * simply discarded from the queue.
1.381 + */
1.382 + public void clear() {
1.383 + final ReentrantLock lock = this.lock;
1.384 + lock.lock();
1.385 + try {
1.386 + q.clear();
1.387 + } finally {
1.388 + lock.unlock();
1.389 + }
1.390 + }
1.391 +
1.392 + /**
1.393 + * Always returns <tt>Integer.MAX_VALUE</tt> because
1.394 + * a <tt>DelayQueue</tt> is not capacity constrained.
1.395 + *
1.396 + * @return <tt>Integer.MAX_VALUE</tt>
1.397 + */
1.398 + public int remainingCapacity() {
1.399 + return Integer.MAX_VALUE;
1.400 + }
1.401 +
1.402 + /**
1.403 + * Returns an array containing all of the elements in this queue.
1.404 + * The returned array elements are in no particular order.
1.405 + *
1.406 + * <p>The returned array will be "safe" in that no references to it are
1.407 + * maintained by this queue. (In other words, this method must allocate
1.408 + * a new array). The caller is thus free to modify the returned array.
1.409 + *
1.410 + * <p>This method acts as bridge between array-based and collection-based
1.411 + * APIs.
1.412 + *
1.413 + * @return an array containing all of the elements in this queue
1.414 + */
1.415 + public Object[] toArray() {
1.416 + final ReentrantLock lock = this.lock;
1.417 + lock.lock();
1.418 + try {
1.419 + return q.toArray();
1.420 + } finally {
1.421 + lock.unlock();
1.422 + }
1.423 + }
1.424 +
1.425 + /**
1.426 + * Returns an array containing all of the elements in this queue; the
1.427 + * runtime type of the returned array is that of the specified array.
1.428 + * The returned array elements are in no particular order.
1.429 + * If the queue fits in the specified array, it is returned therein.
1.430 + * Otherwise, a new array is allocated with the runtime type of the
1.431 + * specified array and the size of this queue.
1.432 + *
1.433 + * <p>If this queue fits in the specified array with room to spare
1.434 + * (i.e., the array has more elements than this queue), the element in
1.435 + * the array immediately following the end of the queue is set to
1.436 + * <tt>null</tt>.
1.437 + *
1.438 + * <p>Like the {@link #toArray()} method, this method acts as bridge between
1.439 + * array-based and collection-based APIs. Further, this method allows
1.440 + * precise control over the runtime type of the output array, and may,
1.441 + * under certain circumstances, be used to save allocation costs.
1.442 + *
1.443 + * <p>The following code can be used to dump a delay queue into a newly
1.444 + * allocated array of <tt>Delayed</tt>:
1.445 + *
1.446 + * <pre>
1.447 + * Delayed[] a = q.toArray(new Delayed[0]);</pre>
1.448 + *
1.449 + * Note that <tt>toArray(new Object[0])</tt> is identical in function to
1.450 + * <tt>toArray()</tt>.
1.451 + *
1.452 + * @param a the array into which the elements of the queue are to
1.453 + * be stored, if it is big enough; otherwise, a new array of the
1.454 + * same runtime type is allocated for this purpose
1.455 + * @return an array containing all of the elements in this queue
1.456 + * @throws ArrayStoreException if the runtime type of the specified array
1.457 + * is not a supertype of the runtime type of every element in
1.458 + * this queue
1.459 + * @throws NullPointerException if the specified array is null
1.460 + */
1.461 + public <T> T[] toArray(T[] a) {
1.462 + final ReentrantLock lock = this.lock;
1.463 + lock.lock();
1.464 + try {
1.465 + return q.toArray(a);
1.466 + } finally {
1.467 + lock.unlock();
1.468 + }
1.469 + }
1.470 +
1.471 + /**
1.472 + * Removes a single instance of the specified element from this
1.473 + * queue, if it is present, whether or not it has expired.
1.474 + */
1.475 + public boolean remove(Object o) {
1.476 + final ReentrantLock lock = this.lock;
1.477 + lock.lock();
1.478 + try {
1.479 + return q.remove(o);
1.480 + } finally {
1.481 + lock.unlock();
1.482 + }
1.483 + }
1.484 +
1.485 + /**
1.486 + * Returns an iterator over all the elements (both expired and
1.487 + * unexpired) in this queue. The iterator does not return the
1.488 + * elements in any particular order.
1.489 + *
1.490 + * <p>The returned iterator is a "weakly consistent" iterator that
1.491 + * will never throw {@link java.util.ConcurrentModificationException
1.492 + * ConcurrentModificationException}, and guarantees to traverse
1.493 + * elements as they existed upon construction of the iterator, and
1.494 + * may (but is not guaranteed to) reflect any modifications
1.495 + * subsequent to construction.
1.496 + *
1.497 + * @return an iterator over the elements in this queue
1.498 + */
1.499 + public Iterator<E> iterator() {
1.500 + return new Itr(toArray());
1.501 + }
1.502 +
1.503 + /**
1.504 + * Snapshot iterator that works off copy of underlying q array.
1.505 + */
1.506 + private class Itr implements Iterator<E> {
1.507 + final Object[] array; // Array of all elements
1.508 + int cursor; // index of next element to return;
1.509 + int lastRet; // index of last element, or -1 if no such
1.510 +
1.511 + Itr(Object[] array) {
1.512 + lastRet = -1;
1.513 + this.array = array;
1.514 + }
1.515 +
1.516 + public boolean hasNext() {
1.517 + return cursor < array.length;
1.518 + }
1.519 +
1.520 + @SuppressWarnings("unchecked")
1.521 + public E next() {
1.522 + if (cursor >= array.length)
1.523 + throw new NoSuchElementException();
1.524 + lastRet = cursor;
1.525 + return (E)array[cursor++];
1.526 + }
1.527 +
1.528 + public void remove() {
1.529 + if (lastRet < 0)
1.530 + throw new IllegalStateException();
1.531 + Object x = array[lastRet];
1.532 + lastRet = -1;
1.533 + // Traverse underlying queue to find == element,
1.534 + // not just a .equals element.
1.535 + lock.lock();
1.536 + try {
1.537 + for (Iterator it = q.iterator(); it.hasNext(); ) {
1.538 + if (it.next() == x) {
1.539 + it.remove();
1.540 + return;
1.541 + }
1.542 + }
1.543 + } finally {
1.544 + lock.unlock();
1.545 + }
1.546 + }
1.547 + }
1.548 +
1.549 +}