2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
36 package java.util.concurrent;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.ReentrantLock;
41 import java.util.AbstractQueue;
42 import java.util.Collection;
43 import java.util.Iterator;
44 import java.util.NoSuchElementException;
47 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
49 * This queue orders elements FIFO (first-in-first-out).
50 * The <em>head</em> of the queue is that element that has been on the
51 * queue the longest time.
52 * The <em>tail</em> of the queue is that element that has been on the
53 * queue the shortest time. New elements
54 * are inserted at the tail of the queue, and the queue retrieval
55 * operations obtain elements at the head of the queue.
56 * Linked queues typically have higher throughput than array-based queues but
57 * less predictable performance in most concurrent applications.
59 * <p> The optional capacity bound constructor argument serves as a
60 * way to prevent excessive queue expansion. The capacity, if unspecified,
61 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
62 * dynamically created upon each insertion unless this would bring the
63 * queue above capacity.
65 * <p>This class and its iterator implement all of the
66 * <em>optional</em> methods of the {@link Collection} and {@link
67 * Iterator} interfaces.
69 * <p>This class is a member of the
70 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
71 * Java Collections Framework</a>.
75 * @param <E> the type of elements held in this collection
78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
79 implements BlockingQueue<E>, java.io.Serializable {
80 private static final long serialVersionUID = -6903933977591709194L;
83 * A variant of the "two lock queue" algorithm. The putLock gates
84 * entry to put (and offer), and has an associated condition for
85 * waiting puts. Similarly for the takeLock. The "count" field
86 * that they both rely on is maintained as an atomic to avoid
87 * needing to get both locks in most cases. Also, to minimize need
88 * for puts to get takeLock and vice-versa, cascading notifies are
89 * used. When a put notices that it has enabled at least one take,
90 * it signals taker. That taker in turn signals others if more
91 * items have been entered since the signal. And symmetrically for
92 * takes signalling puts. Operations such as remove(Object) and
93 * iterators acquire both locks.
95 * Visibility between writers and readers is provided as follows:
97 * Whenever an element is enqueued, the putLock is acquired and
98 * count updated. A subsequent reader guarantees visibility to the
99 * enqueued Node by either acquiring the putLock (via fullyLock)
100 * or by acquiring the takeLock, and then reading n = count.get();
101 * this gives visibility to the first n items.
103 * To implement weakly consistent iterators, it appears we need to
104 * keep all Nodes GC-reachable from a predecessor dequeued Node.
105 * That would cause two problems:
106 * - allow a rogue Iterator to cause unbounded memory retention
107 * - cause cross-generational linking of old Nodes to new Nodes if
108 * a Node was tenured while live, which generational GCs have a
109 * hard time dealing with, causing repeated major collections.
110 * However, only non-deleted Nodes need to be reachable from
111 * dequeued Nodes, and reachability does not necessarily have to
112 * be of the kind understood by the GC. We use the trick of
113 * linking a Node that has just been dequeued to itself. Such a
114 * self-link implicitly means to advance to head.next.
118 * Linked list node class
120 static class Node<E> {
125 * - the real successor Node
126 * - this Node, meaning the successor is head.next
127 * - null, meaning there is no successor (this is the last node)
131 Node(E x) { item = x; }
134 /** The capacity bound, or Integer.MAX_VALUE if none */
135 private final int capacity;
137 /** Current number of elements */
138 private final AtomicInteger count = new AtomicInteger(0);
141 * Head of linked list.
142 * Invariant: head.item == null
144 private transient Node<E> head;
147 * Tail of linked list.
148 * Invariant: last.next == null
150 private transient Node<E> last;
152 /** Lock held by take, poll, etc */
153 private final ReentrantLock takeLock = new ReentrantLock();
155 /** Wait queue for waiting takes */
156 private final Condition notEmpty = takeLock.newCondition();
158 /** Lock held by put, offer, etc */
159 private final ReentrantLock putLock = new ReentrantLock();
161 /** Wait queue for waiting puts */
162 private final Condition notFull = putLock.newCondition();
165 * Signals a waiting take. Called only from put/offer (which do not
166 * otherwise ordinarily lock takeLock.)
168 private void signalNotEmpty() {
169 final ReentrantLock takeLock = this.takeLock;
179 * Signals a waiting put. Called only from take/poll.
181 private void signalNotFull() {
182 final ReentrantLock putLock = this.putLock;
192 * Links node at end of queue.
194 * @param node the node
196 private void enqueue(Node<E> node) {
197 // assert putLock.isHeldByCurrentThread();
198 // assert last.next == null;
199 last = last.next = node;
203 * Removes a node from head of queue.
207 private E dequeue() {
208 // assert takeLock.isHeldByCurrentThread();
209 // assert head.item == null;
211 Node<E> first = h.next;
212 h.next = h; // help GC
220 * Lock to prevent both puts and takes.
228 * Unlock to allow both puts and takes.
236 // * Tells whether both locks are held by current thread.
238 // boolean isFullyLocked() {
239 // return (putLock.isHeldByCurrentThread() &&
240 // takeLock.isHeldByCurrentThread());
244 * Creates a {@code LinkedBlockingQueue} with a capacity of
245 * {@link Integer#MAX_VALUE}.
247 public LinkedBlockingQueue() {
248 this(Integer.MAX_VALUE);
252 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
254 * @param capacity the capacity of this queue
255 * @throws IllegalArgumentException if {@code capacity} is not greater
258 public LinkedBlockingQueue(int capacity) {
259 if (capacity <= 0) throw new IllegalArgumentException();
260 this.capacity = capacity;
261 last = head = new Node<E>(null);
265 * Creates a {@code LinkedBlockingQueue} with a capacity of
266 * {@link Integer#MAX_VALUE}, initially containing the elements of the
268 * added in traversal order of the collection's iterator.
270 * @param c the collection of elements to initially contain
271 * @throws NullPointerException if the specified collection or any
272 * of its elements are null
274 public LinkedBlockingQueue(Collection<? extends E> c) {
275 this(Integer.MAX_VALUE);
276 final ReentrantLock putLock = this.putLock;
277 putLock.lock(); // Never contended, but necessary for visibility
282 throw new NullPointerException();
284 throw new IllegalStateException("Queue full");
285 enqueue(new Node<E>(e));
295 // this doc comment is overridden to remove the reference to collections
296 // greater in size than Integer.MAX_VALUE
298 * Returns the number of elements in this queue.
300 * @return the number of elements in this queue
306 // this doc comment is a modified copy of the inherited doc comment,
307 // without the reference to unlimited queues.
309 * Returns the number of additional elements that this queue can ideally
310 * (in the absence of memory or resource constraints) accept without
311 * blocking. This is always equal to the initial capacity of this queue
312 * less the current {@code size} of this queue.
314 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
315 * an element will succeed by inspecting {@code remainingCapacity}
316 * because it may be the case that another thread is about to
317 * insert or remove an element.
319 public int remainingCapacity() {
320 return capacity - count.get();
324 * Inserts the specified element at the tail of this queue, waiting if
325 * necessary for space to become available.
327 * @throws InterruptedException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
330 public void put(E e) throws InterruptedException {
331 if (e == null) throw new NullPointerException();
332 // Note: convention in all put/take/etc is to preset local var
333 // holding count negative to indicate failure unless set.
335 Node<E> node = new Node(e);
336 final ReentrantLock putLock = this.putLock;
337 final AtomicInteger count = this.count;
338 putLock.lockInterruptibly();
341 * Note that count is used in wait guard even though it is
342 * not protected by lock. This works because count can
343 * only decrease at this point (all other puts are shut
344 * out by lock), and we (or some other waiting put) are
345 * signalled if it ever changes from capacity. Similarly
346 * for all other uses of count in other wait guards.
348 while (count.get() == capacity) {
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
363 * Inserts the specified element at the tail of this queue, waiting if
364 * necessary up to the specified wait time for space to become available.
366 * @return {@code true} if successful, or {@code false} if
367 * the specified waiting time elapses before space is available.
368 * @throws InterruptedException {@inheritDoc}
369 * @throws NullPointerException {@inheritDoc}
371 public boolean offer(E e, long timeout, TimeUnit unit)
372 throws InterruptedException {
374 if (e == null) throw new NullPointerException();
375 long nanos = unit.toNanos(timeout);
377 final ReentrantLock putLock = this.putLock;
378 final AtomicInteger count = this.count;
379 putLock.lockInterruptibly();
381 while (count.get() == capacity) {
384 nanos = notFull.awaitNanos(nanos);
386 enqueue(new Node<E>(e));
387 c = count.getAndIncrement();
388 if (c + 1 < capacity)
399 * Inserts the specified element at the tail of this queue if it is
400 * possible to do so immediately without exceeding the queue's capacity,
401 * returning {@code true} upon success and {@code false} if this queue
403 * When using a capacity-restricted queue, this method is generally
404 * preferable to method {@link BlockingQueue#add add}, which can fail to
405 * insert an element only by throwing an exception.
407 * @throws NullPointerException if the specified element is null
409 public boolean offer(E e) {
410 if (e == null) throw new NullPointerException();
411 final AtomicInteger count = this.count;
412 if (count.get() == capacity)
415 Node<E> node = new Node(e);
416 final ReentrantLock putLock = this.putLock;
419 if (count.get() < capacity) {
421 c = count.getAndIncrement();
422 if (c + 1 < capacity)
434 public E take() throws InterruptedException {
437 final AtomicInteger count = this.count;
438 final ReentrantLock takeLock = this.takeLock;
439 takeLock.lockInterruptibly();
441 while (count.get() == 0) {
445 c = count.getAndDecrement();
456 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
459 long nanos = unit.toNanos(timeout);
460 final AtomicInteger count = this.count;
461 final ReentrantLock takeLock = this.takeLock;
462 takeLock.lockInterruptibly();
464 while (count.get() == 0) {
467 nanos = notEmpty.awaitNanos(nanos);
470 c = count.getAndDecrement();
482 final AtomicInteger count = this.count;
483 if (count.get() == 0)
487 final ReentrantLock takeLock = this.takeLock;
490 if (count.get() > 0) {
492 c = count.getAndDecrement();
505 if (count.get() == 0)
507 final ReentrantLock takeLock = this.takeLock;
510 Node<E> first = head.next;
521 * Unlinks interior Node p with predecessor trail.
523 void unlink(Node<E> p, Node<E> trail) {
524 // assert isFullyLocked();
525 // p.next is not changed, to allow iterators that are
526 // traversing p to maintain their weak-consistency guarantee.
531 if (count.getAndDecrement() == capacity)
536 * Removes a single instance of the specified element from this queue,
537 * if it is present. More formally, removes an element {@code e} such
538 * that {@code o.equals(e)}, if this queue contains one or more such
540 * Returns {@code true} if this queue contained the specified element
541 * (or equivalently, if this queue changed as a result of the call).
543 * @param o element to be removed from this queue, if present
544 * @return {@code true} if this queue changed as a result of the call
546 public boolean remove(Object o) {
547 if (o == null) return false;
550 for (Node<E> trail = head, p = trail.next;
552 trail = p, p = p.next) {
553 if (o.equals(p.item)) {
565 * Returns {@code true} if this queue contains the specified element.
566 * More formally, returns {@code true} if and only if this queue contains
567 * at least one element {@code e} such that {@code o.equals(e)}.
569 * @param o object to be checked for containment in this queue
570 * @return {@code true} if this queue contains the specified element
572 public boolean contains(Object o) {
573 if (o == null) return false;
576 for (Node<E> p = head.next; p != null; p = p.next)
577 if (o.equals(p.item))
586 * Returns an array containing all of the elements in this queue, in
589 * <p>The returned array will be "safe" in that no references to it are
590 * maintained by this queue. (In other words, this method must allocate
591 * a new array). The caller is thus free to modify the returned array.
593 * <p>This method acts as bridge between array-based and collection-based
596 * @return an array containing all of the elements in this queue
598 public Object[] toArray() {
601 int size = count.get();
602 Object[] a = new Object[size];
604 for (Node<E> p = head.next; p != null; p = p.next)
613 * Returns an array containing all of the elements in this queue, in
614 * proper sequence; the runtime type of the returned array is that of
615 * the specified array. If the queue fits in the specified array, it
616 * is returned therein. Otherwise, a new array is allocated with the
617 * runtime type of the specified array and the size of this queue.
619 * <p>If this queue fits in the specified array with room to spare
620 * (i.e., the array has more elements than this queue), the element in
621 * the array immediately following the end of the queue is set to
624 * <p>Like the {@link #toArray()} method, this method acts as bridge between
625 * array-based and collection-based APIs. Further, this method allows
626 * precise control over the runtime type of the output array, and may,
627 * under certain circumstances, be used to save allocation costs.
629 * <p>Suppose {@code x} is a queue known to contain only strings.
630 * The following code can be used to dump the queue into a newly
631 * allocated array of {@code String}:
634 * String[] y = x.toArray(new String[0]);</pre>
636 * Note that {@code toArray(new Object[0])} is identical in function to
639 * @param a the array into which the elements of the queue are to
640 * be stored, if it is big enough; otherwise, a new array of the
641 * same runtime type is allocated for this purpose
642 * @return an array containing all of the elements in this queue
643 * @throws ArrayStoreException if the runtime type of the specified array
644 * is not a supertype of the runtime type of every element in
646 * @throws NullPointerException if the specified array is null
648 @SuppressWarnings("unchecked")
649 public <T> T[] toArray(T[] a) {
652 int size = count.get();
654 a = (T[])java.lang.reflect.Array.newInstance
655 (a.getClass().getComponentType(), size);
658 for (Node<E> p = head.next; p != null; p = p.next)
668 public String toString() {
671 Node<E> p = head.next;
675 StringBuilder sb = new StringBuilder();
679 sb.append(e == this ? "(this Collection)" : e);
682 return sb.append(']').toString();
683 sb.append(',').append(' ');
691 * Atomically removes all of the elements from this queue.
692 * The queue will be empty after this call returns.
694 public void clear() {
697 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
702 // assert head.item == null && head.next == null;
703 if (count.getAndSet(0) == capacity)
711 * @throws UnsupportedOperationException {@inheritDoc}
712 * @throws ClassCastException {@inheritDoc}
713 * @throws NullPointerException {@inheritDoc}
714 * @throws IllegalArgumentException {@inheritDoc}
716 public int drainTo(Collection<? super E> c) {
717 return drainTo(c, Integer.MAX_VALUE);
721 * @throws UnsupportedOperationException {@inheritDoc}
722 * @throws ClassCastException {@inheritDoc}
723 * @throws NullPointerException {@inheritDoc}
724 * @throws IllegalArgumentException {@inheritDoc}
726 public int drainTo(Collection<? super E> c, int maxElements) {
728 throw new NullPointerException();
730 throw new IllegalArgumentException();
731 boolean signalNotFull = false;
732 final ReentrantLock takeLock = this.takeLock;
735 int n = Math.min(maxElements, count.get());
736 // count.get provides visibility to first n Nodes
750 // Restore invariants even if c.add() threw
752 // assert h.item == null;
754 signalNotFull = (count.getAndAdd(-i) == capacity);
765 * Returns an iterator over the elements in this queue in proper sequence.
766 * The elements will be returned in order from first (head) to last (tail).
768 * <p>The returned iterator is a "weakly consistent" iterator that
769 * will never throw {@link java.util.ConcurrentModificationException
770 * ConcurrentModificationException}, and guarantees to traverse
771 * elements as they existed upon construction of the iterator, and
772 * may (but is not guaranteed to) reflect any modifications
773 * subsequent to construction.
775 * @return an iterator over the elements in this queue in proper sequence
777 public Iterator<E> iterator() {
781 private class Itr implements Iterator<E> {
783 * Basic weakly-consistent iterator. At all times hold the next
784 * item to hand out so that if hasNext() reports true, we will
785 * still have it to return even if lost race with a take etc.
787 private Node<E> current;
788 private Node<E> lastRet;
789 private E currentElement;
796 currentElement = current.item;
802 public boolean hasNext() {
803 return current != null;
807 * Returns the next live successor of p, or null if no such.
809 * Unlike other traversal methods, iterators need to handle both:
810 * - dequeued nodes (p.next == p)
811 * - (possibly multiple) interior removed nodes (p.item == null)
813 private Node<E> nextNode(Node<E> p) {
818 if (s == null || s.item != null)
828 throw new NoSuchElementException();
829 E x = currentElement;
831 current = nextNode(current);
832 currentElement = (current == null) ? null : current.item;
839 public void remove() {
841 throw new IllegalStateException();
844 Node<E> node = lastRet;
846 for (Node<E> trail = head, p = trail.next;
848 trail = p, p = p.next) {
861 * Save the state to a stream (that is, serialize it).
863 * @serialData The capacity is emitted (int), followed by all of
864 * its elements (each an {@code Object}) in the proper order,
866 * @param s the stream
868 private void writeObject(java.io.ObjectOutputStream s)
869 throws java.io.IOException {
873 // Write out any hidden stuff, plus capacity
874 s.defaultWriteObject();
876 // Write out all elements in the proper order.
877 for (Node<E> p = head.next; p != null; p = p.next)
878 s.writeObject(p.item);
880 // Use trailing null as sentinel
888 * Reconstitute this queue instance from a stream (that is,
891 * @param s the stream
893 private void readObject(java.io.ObjectInputStream s)
894 throws java.io.IOException, ClassNotFoundException {
895 // Read in capacity, and any hidden stuff
896 s.defaultReadObject();
899 last = head = new Node<E>(null);
901 // Read in all elements and place in queue
903 @SuppressWarnings("unchecked")
904 E item = (E)s.readObject();