rt/emul/compact/src/main/java/java/util/concurrent/DelayQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 
    37 package java.util.concurrent;
    38 import java.util.concurrent.locks.*;
    39 import java.util.*;
    40 
    41 /**
    42  * An unbounded {@linkplain BlockingQueue blocking queue} of
    43  * <tt>Delayed</tt> elements, in which an element can only be taken
    44  * when its delay has expired.  The <em>head</em> of the queue is that
    45  * <tt>Delayed</tt> element whose delay expired furthest in the
    46  * past.  If no delay has expired there is no head and <tt>poll</tt>
    47  * will return <tt>null</tt>. Expiration occurs when an element's
    48  * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
    49  * than or equal to zero.  Even though unexpired elements cannot be
    50  * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
    51  * treated as normal elements. For example, the <tt>size</tt> method
    52  * returns the count of both expired and unexpired elements.
    53  * This queue does not permit null elements.
    54  *
    55  * <p>This class and its iterator implement all of the
    56  * <em>optional</em> methods of the {@link Collection} and {@link
    57  * Iterator} interfaces.
    58  *
    59  * <p>This class is a member of the
    60  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    61  * Java Collections Framework</a>.
    62  *
    63  * @since 1.5
    64  * @author Doug Lea
    65  * @param <E> the type of elements held in this collection
    66  */
    67 
    68 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    69     implements BlockingQueue<E> {
    70 
    71     private transient final ReentrantLock lock = new ReentrantLock();
    72     private final PriorityQueue<E> q = new PriorityQueue<E>();
    73 
    74     /**
    75      * Thread designated to wait for the element at the head of
    76      * the queue.  This variant of the Leader-Follower pattern
    77      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
    78      * minimize unnecessary timed waiting.  When a thread becomes
    79      * the leader, it waits only for the next delay to elapse, but
    80      * other threads await indefinitely.  The leader thread must
    81      * signal some other thread before returning from take() or
    82      * poll(...), unless some other thread becomes leader in the
    83      * interim.  Whenever the head of the queue is replaced with
    84      * an element with an earlier expiration time, the leader
    85      * field is invalidated by being reset to null, and some
    86      * waiting thread, but not necessarily the current leader, is
    87      * signalled.  So waiting threads must be prepared to acquire
    88      * and lose leadership while waiting.
    89      */
    90     private Thread leader = null;
    91 
    92     /**
    93      * Condition signalled when a newer element becomes available
    94      * at the head of the queue or a new thread may need to
    95      * become leader.
    96      */
    97     private final Condition available = lock.newCondition();
    98 
    99     /**
   100      * Creates a new <tt>DelayQueue</tt> that is initially empty.
   101      */
   102     public DelayQueue() {}
   103 
   104     /**
   105      * Creates a <tt>DelayQueue</tt> initially containing the elements of the
   106      * given collection of {@link Delayed} instances.
   107      *
   108      * @param c the collection of elements to initially contain
   109      * @throws NullPointerException if the specified collection or any
   110      *         of its elements are null
   111      */
   112     public DelayQueue(Collection<? extends E> c) {
   113         this.addAll(c);
   114     }
   115 
   116     /**
   117      * Inserts the specified element into this delay queue.
   118      *
   119      * @param e the element to add
   120      * @return <tt>true</tt> (as specified by {@link Collection#add})
   121      * @throws NullPointerException if the specified element is null
   122      */
   123     public boolean add(E e) {
   124         return offer(e);
   125     }
   126 
   127     /**
   128      * Inserts the specified element into this delay queue.
   129      *
   130      * @param e the element to add
   131      * @return <tt>true</tt>
   132      * @throws NullPointerException if the specified element is null
   133      */
   134     public boolean offer(E e) {
   135         final ReentrantLock lock = this.lock;
   136         lock.lock();
   137         try {
   138             q.offer(e);
   139             if (q.peek() == e) {
   140                 leader = null;
   141                 available.signal();
   142             }
   143             return true;
   144         } finally {
   145             lock.unlock();
   146         }
   147     }
   148 
   149     /**
   150      * Inserts the specified element into this delay queue. As the queue is
   151      * unbounded this method will never block.
   152      *
   153      * @param e the element to add
   154      * @throws NullPointerException {@inheritDoc}
   155      */
   156     public void put(E e) {
   157         offer(e);
   158     }
   159 
   160     /**
   161      * Inserts the specified element into this delay queue. As the queue is
   162      * unbounded this method will never block.
   163      *
   164      * @param e the element to add
   165      * @param timeout This parameter is ignored as the method never blocks
   166      * @param unit This parameter is ignored as the method never blocks
   167      * @return <tt>true</tt>
   168      * @throws NullPointerException {@inheritDoc}
   169      */
   170     public boolean offer(E e, long timeout, TimeUnit unit) {
   171         return offer(e);
   172     }
   173 
   174     /**
   175      * Retrieves and removes the head of this queue, or returns <tt>null</tt>
   176      * if this queue has no elements with an expired delay.
   177      *
   178      * @return the head of this queue, or <tt>null</tt> if this
   179      *         queue has no elements with an expired delay
   180      */
   181     public E poll() {
   182         final ReentrantLock lock = this.lock;
   183         lock.lock();
   184         try {
   185             E first = q.peek();
   186             if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
   187                 return null;
   188             else
   189                 return q.poll();
   190         } finally {
   191             lock.unlock();
   192         }
   193     }
   194 
   195     /**
   196      * Retrieves and removes the head of this queue, waiting if necessary
   197      * until an element with an expired delay is available on this queue.
   198      *
   199      * @return the head of this queue
   200      * @throws InterruptedException {@inheritDoc}
   201      */
   202     public E take() throws InterruptedException {
   203         final ReentrantLock lock = this.lock;
   204         lock.lockInterruptibly();
   205         try {
   206             for (;;) {
   207                 E first = q.peek();
   208                 if (first == null)
   209                     available.await();
   210                 else {
   211                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
   212                     if (delay <= 0)
   213                         return q.poll();
   214                     else if (leader != null)
   215                         available.await();
   216                     else {
   217                         Thread thisThread = Thread.currentThread();
   218                         leader = thisThread;
   219                         try {
   220                             available.awaitNanos(delay);
   221                         } finally {
   222                             if (leader == thisThread)
   223                                 leader = null;
   224                         }
   225                     }
   226                 }
   227             }
   228         } finally {
   229             if (leader == null && q.peek() != null)
   230                 available.signal();
   231             lock.unlock();
   232         }
   233     }
   234 
   235     /**
   236      * Retrieves and removes the head of this queue, waiting if necessary
   237      * until an element with an expired delay is available on this queue,
   238      * or the specified wait time expires.
   239      *
   240      * @return the head of this queue, or <tt>null</tt> if the
   241      *         specified waiting time elapses before an element with
   242      *         an expired delay becomes available
   243      * @throws InterruptedException {@inheritDoc}
   244      */
   245     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   246         long nanos = unit.toNanos(timeout);
   247         final ReentrantLock lock = this.lock;
   248         lock.lockInterruptibly();
   249         try {
   250             for (;;) {
   251                 E first = q.peek();
   252                 if (first == null) {
   253                     if (nanos <= 0)
   254                         return null;
   255                     else
   256                         nanos = available.awaitNanos(nanos);
   257                 } else {
   258                     long delay = first.getDelay(TimeUnit.NANOSECONDS);
   259                     if (delay <= 0)
   260                         return q.poll();
   261                     if (nanos <= 0)
   262                         return null;
   263                     if (nanos < delay || leader != null)
   264                         nanos = available.awaitNanos(nanos);
   265                     else {
   266                         Thread thisThread = Thread.currentThread();
   267                         leader = thisThread;
   268                         try {
   269                             long timeLeft = available.awaitNanos(delay);
   270                             nanos -= delay - timeLeft;
   271                         } finally {
   272                             if (leader == thisThread)
   273                                 leader = null;
   274                         }
   275                     }
   276                 }
   277             }
   278         } finally {
   279             if (leader == null && q.peek() != null)
   280                 available.signal();
   281             lock.unlock();
   282         }
   283     }
   284 
   285     /**
   286      * Retrieves, but does not remove, the head of this queue, or
   287      * returns <tt>null</tt> if this queue is empty.  Unlike
   288      * <tt>poll</tt>, if no expired elements are available in the queue,
   289      * this method returns the element that will expire next,
   290      * if one exists.
   291      *
   292      * @return the head of this queue, or <tt>null</tt> if this
   293      *         queue is empty.
   294      */
   295     public E peek() {
   296         final ReentrantLock lock = this.lock;
   297         lock.lock();
   298         try {
   299             return q.peek();
   300         } finally {
   301             lock.unlock();
   302         }
   303     }
   304 
   305     public int size() {
   306         final ReentrantLock lock = this.lock;
   307         lock.lock();
   308         try {
   309             return q.size();
   310         } finally {
   311             lock.unlock();
   312         }
   313     }
   314 
   315     /**
   316      * @throws UnsupportedOperationException {@inheritDoc}
   317      * @throws ClassCastException            {@inheritDoc}
   318      * @throws NullPointerException          {@inheritDoc}
   319      * @throws IllegalArgumentException      {@inheritDoc}
   320      */
   321     public int drainTo(Collection<? super E> c) {
   322         if (c == null)
   323             throw new NullPointerException();
   324         if (c == this)
   325             throw new IllegalArgumentException();
   326         final ReentrantLock lock = this.lock;
   327         lock.lock();
   328         try {
   329             int n = 0;
   330             for (;;) {
   331                 E first = q.peek();
   332                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
   333                     break;
   334                 c.add(q.poll());
   335                 ++n;
   336             }
   337             return n;
   338         } finally {
   339             lock.unlock();
   340         }
   341     }
   342 
   343     /**
   344      * @throws UnsupportedOperationException {@inheritDoc}
   345      * @throws ClassCastException            {@inheritDoc}
   346      * @throws NullPointerException          {@inheritDoc}
   347      * @throws IllegalArgumentException      {@inheritDoc}
   348      */
   349     public int drainTo(Collection<? super E> c, int maxElements) {
   350         if (c == null)
   351             throw new NullPointerException();
   352         if (c == this)
   353             throw new IllegalArgumentException();
   354         if (maxElements <= 0)
   355             return 0;
   356         final ReentrantLock lock = this.lock;
   357         lock.lock();
   358         try {
   359             int n = 0;
   360             while (n < maxElements) {
   361                 E first = q.peek();
   362                 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
   363                     break;
   364                 c.add(q.poll());
   365                 ++n;
   366             }
   367             return n;
   368         } finally {
   369             lock.unlock();
   370         }
   371     }
   372 
   373     /**
   374      * Atomically removes all of the elements from this delay queue.
   375      * The queue will be empty after this call returns.
   376      * Elements with an unexpired delay are not waited for; they are
   377      * simply discarded from the queue.
   378      */
   379     public void clear() {
   380         final ReentrantLock lock = this.lock;
   381         lock.lock();
   382         try {
   383             q.clear();
   384         } finally {
   385             lock.unlock();
   386         }
   387     }
   388 
   389     /**
   390      * Always returns <tt>Integer.MAX_VALUE</tt> because
   391      * a <tt>DelayQueue</tt> is not capacity constrained.
   392      *
   393      * @return <tt>Integer.MAX_VALUE</tt>
   394      */
   395     public int remainingCapacity() {
   396         return Integer.MAX_VALUE;
   397     }
   398 
   399     /**
   400      * Returns an array containing all of the elements in this queue.
   401      * The returned array elements are in no particular order.
   402      *
   403      * <p>The returned array will be "safe" in that no references to it are
   404      * maintained by this queue.  (In other words, this method must allocate
   405      * a new array).  The caller is thus free to modify the returned array.
   406      *
   407      * <p>This method acts as bridge between array-based and collection-based
   408      * APIs.
   409      *
   410      * @return an array containing all of the elements in this queue
   411      */
   412     public Object[] toArray() {
   413         final ReentrantLock lock = this.lock;
   414         lock.lock();
   415         try {
   416             return q.toArray();
   417         } finally {
   418             lock.unlock();
   419         }
   420     }
   421 
   422     /**
   423      * Returns an array containing all of the elements in this queue; the
   424      * runtime type of the returned array is that of the specified array.
   425      * The returned array elements are in no particular order.
   426      * If the queue fits in the specified array, it is returned therein.
   427      * Otherwise, a new array is allocated with the runtime type of the
   428      * specified array and the size of this queue.
   429      *
   430      * <p>If this queue fits in the specified array with room to spare
   431      * (i.e., the array has more elements than this queue), the element in
   432      * the array immediately following the end of the queue is set to
   433      * <tt>null</tt>.
   434      *
   435      * <p>Like the {@link #toArray()} method, this method acts as bridge between
   436      * array-based and collection-based APIs.  Further, this method allows
   437      * precise control over the runtime type of the output array, and may,
   438      * under certain circumstances, be used to save allocation costs.
   439      *
   440      * <p>The following code can be used to dump a delay queue into a newly
   441      * allocated array of <tt>Delayed</tt>:
   442      *
   443      * <pre>
   444      *     Delayed[] a = q.toArray(new Delayed[0]);</pre>
   445      *
   446      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
   447      * <tt>toArray()</tt>.
   448      *
   449      * @param a the array into which the elements of the queue are to
   450      *          be stored, if it is big enough; otherwise, a new array of the
   451      *          same runtime type is allocated for this purpose
   452      * @return an array containing all of the elements in this queue
   453      * @throws ArrayStoreException if the runtime type of the specified array
   454      *         is not a supertype of the runtime type of every element in
   455      *         this queue
   456      * @throws NullPointerException if the specified array is null
   457      */
   458     public <T> T[] toArray(T[] a) {
   459         final ReentrantLock lock = this.lock;
   460         lock.lock();
   461         try {
   462             return q.toArray(a);
   463         } finally {
   464             lock.unlock();
   465         }
   466     }
   467 
   468     /**
   469      * Removes a single instance of the specified element from this
   470      * queue, if it is present, whether or not it has expired.
   471      */
   472     public boolean remove(Object o) {
   473         final ReentrantLock lock = this.lock;
   474         lock.lock();
   475         try {
   476             return q.remove(o);
   477         } finally {
   478             lock.unlock();
   479         }
   480     }
   481 
   482     /**
   483      * Returns an iterator over all the elements (both expired and
   484      * unexpired) in this queue. The iterator does not return the
   485      * elements in any particular order.
   486      *
   487      * <p>The returned iterator is a "weakly consistent" iterator that
   488      * will never throw {@link java.util.ConcurrentModificationException
   489      * ConcurrentModificationException}, and guarantees to traverse
   490      * elements as they existed upon construction of the iterator, and
   491      * may (but is not guaranteed to) reflect any modifications
   492      * subsequent to construction.
   493      *
   494      * @return an iterator over the elements in this queue
   495      */
   496     public Iterator<E> iterator() {
   497         return new Itr(toArray());
   498     }
   499 
   500     /**
   501      * Snapshot iterator that works off copy of underlying q array.
   502      */
   503     private class Itr implements Iterator<E> {
   504         final Object[] array; // Array of all elements
   505         int cursor;           // index of next element to return;
   506         int lastRet;          // index of last element, or -1 if no such
   507 
   508         Itr(Object[] array) {
   509             lastRet = -1;
   510             this.array = array;
   511         }
   512 
   513         public boolean hasNext() {
   514             return cursor < array.length;
   515         }
   516 
   517         @SuppressWarnings("unchecked")
   518         public E next() {
   519             if (cursor >= array.length)
   520                 throw new NoSuchElementException();
   521             lastRet = cursor;
   522             return (E)array[cursor++];
   523         }
   524 
   525         public void remove() {
   526             if (lastRet < 0)
   527                 throw new IllegalStateException();
   528             Object x = array[lastRet];
   529             lastRet = -1;
   530             // Traverse underlying queue to find == element,
   531             // not just a .equals element.
   532             lock.lock();
   533             try {
   534                 for (Iterator it = q.iterator(); it.hasNext(); ) {
   535                     if (it.next() == x) {
   536                         it.remove();
   537                         return;
   538                     }
   539                 }
   540             } finally {
   541                 lock.unlock();
   542             }
   543         }
   544     }
   545 
   546 }