1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/LinkedBlockingQueue.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,910 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +
1.41 +import java.util.concurrent.atomic.AtomicInteger;
1.42 +import java.util.concurrent.locks.Condition;
1.43 +import java.util.concurrent.locks.ReentrantLock;
1.44 +import java.util.AbstractQueue;
1.45 +import java.util.Collection;
1.46 +import java.util.Iterator;
1.47 +import java.util.NoSuchElementException;
1.48 +
1.49 +/**
1.50 + * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
1.51 + * linked nodes.
1.52 + * This queue orders elements FIFO (first-in-first-out).
1.53 + * The <em>head</em> of the queue is that element that has been on the
1.54 + * queue the longest time.
1.55 + * The <em>tail</em> of the queue is that element that has been on the
1.56 + * queue the shortest time. New elements
1.57 + * are inserted at the tail of the queue, and the queue retrieval
1.58 + * operations obtain elements at the head of the queue.
1.59 + * Linked queues typically have higher throughput than array-based queues but
1.60 + * less predictable performance in most concurrent applications.
1.61 + *
1.62 + * <p> The optional capacity bound constructor argument serves as a
1.63 + * way to prevent excessive queue expansion. The capacity, if unspecified,
1.64 + * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
1.65 + * dynamically created upon each insertion unless this would bring the
1.66 + * queue above capacity.
1.67 + *
1.68 + * <p>This class and its iterator implement all of the
1.69 + * <em>optional</em> methods of the {@link Collection} and {@link
1.70 + * Iterator} interfaces.
1.71 + *
1.72 + * <p>This class is a member of the
1.73 + * <a href="{@docRoot}/../technotes/guides/collections/index.html">
1.74 + * Java Collections Framework</a>.
1.75 + *
1.76 + * @since 1.5
1.77 + * @author Doug Lea
1.78 + * @param <E> the type of elements held in this collection
1.79 + *
1.80 + */
1.81 +public class LinkedBlockingQueue<E> extends AbstractQueue<E>
1.82 + implements BlockingQueue<E>, java.io.Serializable {
1.83 + private static final long serialVersionUID = -6903933977591709194L;
1.84 +
1.85 + /*
1.86 + * A variant of the "two lock queue" algorithm. The putLock gates
1.87 + * entry to put (and offer), and has an associated condition for
1.88 + * waiting puts. Similarly for the takeLock. The "count" field
1.89 + * that they both rely on is maintained as an atomic to avoid
1.90 + * needing to get both locks in most cases. Also, to minimize need
1.91 + * for puts to get takeLock and vice-versa, cascading notifies are
1.92 + * used. When a put notices that it has enabled at least one take,
1.93 + * it signals taker. That taker in turn signals others if more
1.94 + * items have been entered since the signal. And symmetrically for
1.95 + * takes signalling puts. Operations such as remove(Object) and
1.96 + * iterators acquire both locks.
1.97 + *
1.98 + * Visibility between writers and readers is provided as follows:
1.99 + *
1.100 + * Whenever an element is enqueued, the putLock is acquired and
1.101 + * count updated. A subsequent reader guarantees visibility to the
1.102 + * enqueued Node by either acquiring the putLock (via fullyLock)
1.103 + * or by acquiring the takeLock, and then reading n = count.get();
1.104 + * this gives visibility to the first n items.
1.105 + *
1.106 + * To implement weakly consistent iterators, it appears we need to
1.107 + * keep all Nodes GC-reachable from a predecessor dequeued Node.
1.108 + * That would cause two problems:
1.109 + * - allow a rogue Iterator to cause unbounded memory retention
1.110 + * - cause cross-generational linking of old Nodes to new Nodes if
1.111 + * a Node was tenured while live, which generational GCs have a
1.112 + * hard time dealing with, causing repeated major collections.
1.113 + * However, only non-deleted Nodes need to be reachable from
1.114 + * dequeued Nodes, and reachability does not necessarily have to
1.115 + * be of the kind understood by the GC. We use the trick of
1.116 + * linking a Node that has just been dequeued to itself. Such a
1.117 + * self-link implicitly means to advance to head.next.
1.118 + */
1.119 +
1.120 + /**
1.121 + * Linked list node class
1.122 + */
1.123 + static class Node<E> {
1.124 + E item;
1.125 +
1.126 + /**
1.127 + * One of:
1.128 + * - the real successor Node
1.129 + * - this Node, meaning the successor is head.next
1.130 + * - null, meaning there is no successor (this is the last node)
1.131 + */
1.132 + Node<E> next;
1.133 +
1.134 + Node(E x) { item = x; }
1.135 + }
1.136 +
1.137 + /** The capacity bound, or Integer.MAX_VALUE if none */
1.138 + private final int capacity;
1.139 +
1.140 + /** Current number of elements */
1.141 + private final AtomicInteger count = new AtomicInteger(0);
1.142 +
1.143 + /**
1.144 + * Head of linked list.
1.145 + * Invariant: head.item == null
1.146 + */
1.147 + private transient Node<E> head;
1.148 +
1.149 + /**
1.150 + * Tail of linked list.
1.151 + * Invariant: last.next == null
1.152 + */
1.153 + private transient Node<E> last;
1.154 +
1.155 + /** Lock held by take, poll, etc */
1.156 + private final ReentrantLock takeLock = new ReentrantLock();
1.157 +
1.158 + /** Wait queue for waiting takes */
1.159 + private final Condition notEmpty = takeLock.newCondition();
1.160 +
1.161 + /** Lock held by put, offer, etc */
1.162 + private final ReentrantLock putLock = new ReentrantLock();
1.163 +
1.164 + /** Wait queue for waiting puts */
1.165 + private final Condition notFull = putLock.newCondition();
1.166 +
1.167 + /**
1.168 + * Signals a waiting take. Called only from put/offer (which do not
1.169 + * otherwise ordinarily lock takeLock.)
1.170 + */
1.171 + private void signalNotEmpty() {
1.172 + final ReentrantLock takeLock = this.takeLock;
1.173 + takeLock.lock();
1.174 + try {
1.175 + notEmpty.signal();
1.176 + } finally {
1.177 + takeLock.unlock();
1.178 + }
1.179 + }
1.180 +
1.181 + /**
1.182 + * Signals a waiting put. Called only from take/poll.
1.183 + */
1.184 + private void signalNotFull() {
1.185 + final ReentrantLock putLock = this.putLock;
1.186 + putLock.lock();
1.187 + try {
1.188 + notFull.signal();
1.189 + } finally {
1.190 + putLock.unlock();
1.191 + }
1.192 + }
1.193 +
1.194 + /**
1.195 + * Links node at end of queue.
1.196 + *
1.197 + * @param node the node
1.198 + */
1.199 + private void enqueue(Node<E> node) {
1.200 + // assert putLock.isHeldByCurrentThread();
1.201 + // assert last.next == null;
1.202 + last = last.next = node;
1.203 + }
1.204 +
1.205 + /**
1.206 + * Removes a node from head of queue.
1.207 + *
1.208 + * @return the node
1.209 + */
1.210 + private E dequeue() {
1.211 + // assert takeLock.isHeldByCurrentThread();
1.212 + // assert head.item == null;
1.213 + Node<E> h = head;
1.214 + Node<E> first = h.next;
1.215 + h.next = h; // help GC
1.216 + head = first;
1.217 + E x = first.item;
1.218 + first.item = null;
1.219 + return x;
1.220 + }
1.221 +
1.222 + /**
1.223 + * Lock to prevent both puts and takes.
1.224 + */
1.225 + void fullyLock() {
1.226 + putLock.lock();
1.227 + takeLock.lock();
1.228 + }
1.229 +
1.230 + /**
1.231 + * Unlock to allow both puts and takes.
1.232 + */
1.233 + void fullyUnlock() {
1.234 + takeLock.unlock();
1.235 + putLock.unlock();
1.236 + }
1.237 +
1.238 +// /**
1.239 +// * Tells whether both locks are held by current thread.
1.240 +// */
1.241 +// boolean isFullyLocked() {
1.242 +// return (putLock.isHeldByCurrentThread() &&
1.243 +// takeLock.isHeldByCurrentThread());
1.244 +// }
1.245 +
1.246 + /**
1.247 + * Creates a {@code LinkedBlockingQueue} with a capacity of
1.248 + * {@link Integer#MAX_VALUE}.
1.249 + */
1.250 + public LinkedBlockingQueue() {
1.251 + this(Integer.MAX_VALUE);
1.252 + }
1.253 +
1.254 + /**
1.255 + * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
1.256 + *
1.257 + * @param capacity the capacity of this queue
1.258 + * @throws IllegalArgumentException if {@code capacity} is not greater
1.259 + * than zero
1.260 + */
1.261 + public LinkedBlockingQueue(int capacity) {
1.262 + if (capacity <= 0) throw new IllegalArgumentException();
1.263 + this.capacity = capacity;
1.264 + last = head = new Node<E>(null);
1.265 + }
1.266 +
1.267 + /**
1.268 + * Creates a {@code LinkedBlockingQueue} with a capacity of
1.269 + * {@link Integer#MAX_VALUE}, initially containing the elements of the
1.270 + * given collection,
1.271 + * added in traversal order of the collection's iterator.
1.272 + *
1.273 + * @param c the collection of elements to initially contain
1.274 + * @throws NullPointerException if the specified collection or any
1.275 + * of its elements are null
1.276 + */
1.277 + public LinkedBlockingQueue(Collection<? extends E> c) {
1.278 + this(Integer.MAX_VALUE);
1.279 + final ReentrantLock putLock = this.putLock;
1.280 + putLock.lock(); // Never contended, but necessary for visibility
1.281 + try {
1.282 + int n = 0;
1.283 + for (E e : c) {
1.284 + if (e == null)
1.285 + throw new NullPointerException();
1.286 + if (n == capacity)
1.287 + throw new IllegalStateException("Queue full");
1.288 + enqueue(new Node<E>(e));
1.289 + ++n;
1.290 + }
1.291 + count.set(n);
1.292 + } finally {
1.293 + putLock.unlock();
1.294 + }
1.295 + }
1.296 +
1.297 +
1.298 + // this doc comment is overridden to remove the reference to collections
1.299 + // greater in size than Integer.MAX_VALUE
1.300 + /**
1.301 + * Returns the number of elements in this queue.
1.302 + *
1.303 + * @return the number of elements in this queue
1.304 + */
1.305 + public int size() {
1.306 + return count.get();
1.307 + }
1.308 +
1.309 + // this doc comment is a modified copy of the inherited doc comment,
1.310 + // without the reference to unlimited queues.
1.311 + /**
1.312 + * Returns the number of additional elements that this queue can ideally
1.313 + * (in the absence of memory or resource constraints) accept without
1.314 + * blocking. This is always equal to the initial capacity of this queue
1.315 + * less the current {@code size} of this queue.
1.316 + *
1.317 + * <p>Note that you <em>cannot</em> always tell if an attempt to insert
1.318 + * an element will succeed by inspecting {@code remainingCapacity}
1.319 + * because it may be the case that another thread is about to
1.320 + * insert or remove an element.
1.321 + */
1.322 + public int remainingCapacity() {
1.323 + return capacity - count.get();
1.324 + }
1.325 +
1.326 + /**
1.327 + * Inserts the specified element at the tail of this queue, waiting if
1.328 + * necessary for space to become available.
1.329 + *
1.330 + * @throws InterruptedException {@inheritDoc}
1.331 + * @throws NullPointerException {@inheritDoc}
1.332 + */
1.333 + public void put(E e) throws InterruptedException {
1.334 + if (e == null) throw new NullPointerException();
1.335 + // Note: convention in all put/take/etc is to preset local var
1.336 + // holding count negative to indicate failure unless set.
1.337 + int c = -1;
1.338 + Node<E> node = new Node(e);
1.339 + final ReentrantLock putLock = this.putLock;
1.340 + final AtomicInteger count = this.count;
1.341 + putLock.lockInterruptibly();
1.342 + try {
1.343 + /*
1.344 + * Note that count is used in wait guard even though it is
1.345 + * not protected by lock. This works because count can
1.346 + * only decrease at this point (all other puts are shut
1.347 + * out by lock), and we (or some other waiting put) are
1.348 + * signalled if it ever changes from capacity. Similarly
1.349 + * for all other uses of count in other wait guards.
1.350 + */
1.351 + while (count.get() == capacity) {
1.352 + notFull.await();
1.353 + }
1.354 + enqueue(node);
1.355 + c = count.getAndIncrement();
1.356 + if (c + 1 < capacity)
1.357 + notFull.signal();
1.358 + } finally {
1.359 + putLock.unlock();
1.360 + }
1.361 + if (c == 0)
1.362 + signalNotEmpty();
1.363 + }
1.364 +
1.365 + /**
1.366 + * Inserts the specified element at the tail of this queue, waiting if
1.367 + * necessary up to the specified wait time for space to become available.
1.368 + *
1.369 + * @return {@code true} if successful, or {@code false} if
1.370 + * the specified waiting time elapses before space is available.
1.371 + * @throws InterruptedException {@inheritDoc}
1.372 + * @throws NullPointerException {@inheritDoc}
1.373 + */
1.374 + public boolean offer(E e, long timeout, TimeUnit unit)
1.375 + throws InterruptedException {
1.376 +
1.377 + if (e == null) throw new NullPointerException();
1.378 + long nanos = unit.toNanos(timeout);
1.379 + int c = -1;
1.380 + final ReentrantLock putLock = this.putLock;
1.381 + final AtomicInteger count = this.count;
1.382 + putLock.lockInterruptibly();
1.383 + try {
1.384 + while (count.get() == capacity) {
1.385 + if (nanos <= 0)
1.386 + return false;
1.387 + nanos = notFull.awaitNanos(nanos);
1.388 + }
1.389 + enqueue(new Node<E>(e));
1.390 + c = count.getAndIncrement();
1.391 + if (c + 1 < capacity)
1.392 + notFull.signal();
1.393 + } finally {
1.394 + putLock.unlock();
1.395 + }
1.396 + if (c == 0)
1.397 + signalNotEmpty();
1.398 + return true;
1.399 + }
1.400 +
1.401 + /**
1.402 + * Inserts the specified element at the tail of this queue if it is
1.403 + * possible to do so immediately without exceeding the queue's capacity,
1.404 + * returning {@code true} upon success and {@code false} if this queue
1.405 + * is full.
1.406 + * When using a capacity-restricted queue, this method is generally
1.407 + * preferable to method {@link BlockingQueue#add add}, which can fail to
1.408 + * insert an element only by throwing an exception.
1.409 + *
1.410 + * @throws NullPointerException if the specified element is null
1.411 + */
1.412 + public boolean offer(E e) {
1.413 + if (e == null) throw new NullPointerException();
1.414 + final AtomicInteger count = this.count;
1.415 + if (count.get() == capacity)
1.416 + return false;
1.417 + int c = -1;
1.418 + Node<E> node = new Node(e);
1.419 + final ReentrantLock putLock = this.putLock;
1.420 + putLock.lock();
1.421 + try {
1.422 + if (count.get() < capacity) {
1.423 + enqueue(node);
1.424 + c = count.getAndIncrement();
1.425 + if (c + 1 < capacity)
1.426 + notFull.signal();
1.427 + }
1.428 + } finally {
1.429 + putLock.unlock();
1.430 + }
1.431 + if (c == 0)
1.432 + signalNotEmpty();
1.433 + return c >= 0;
1.434 + }
1.435 +
1.436 +
1.437 + public E take() throws InterruptedException {
1.438 + E x;
1.439 + int c = -1;
1.440 + final AtomicInteger count = this.count;
1.441 + final ReentrantLock takeLock = this.takeLock;
1.442 + takeLock.lockInterruptibly();
1.443 + try {
1.444 + while (count.get() == 0) {
1.445 + notEmpty.await();
1.446 + }
1.447 + x = dequeue();
1.448 + c = count.getAndDecrement();
1.449 + if (c > 1)
1.450 + notEmpty.signal();
1.451 + } finally {
1.452 + takeLock.unlock();
1.453 + }
1.454 + if (c == capacity)
1.455 + signalNotFull();
1.456 + return x;
1.457 + }
1.458 +
1.459 + public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1.460 + E x = null;
1.461 + int c = -1;
1.462 + long nanos = unit.toNanos(timeout);
1.463 + final AtomicInteger count = this.count;
1.464 + final ReentrantLock takeLock = this.takeLock;
1.465 + takeLock.lockInterruptibly();
1.466 + try {
1.467 + while (count.get() == 0) {
1.468 + if (nanos <= 0)
1.469 + return null;
1.470 + nanos = notEmpty.awaitNanos(nanos);
1.471 + }
1.472 + x = dequeue();
1.473 + c = count.getAndDecrement();
1.474 + if (c > 1)
1.475 + notEmpty.signal();
1.476 + } finally {
1.477 + takeLock.unlock();
1.478 + }
1.479 + if (c == capacity)
1.480 + signalNotFull();
1.481 + return x;
1.482 + }
1.483 +
1.484 + public E poll() {
1.485 + final AtomicInteger count = this.count;
1.486 + if (count.get() == 0)
1.487 + return null;
1.488 + E x = null;
1.489 + int c = -1;
1.490 + final ReentrantLock takeLock = this.takeLock;
1.491 + takeLock.lock();
1.492 + try {
1.493 + if (count.get() > 0) {
1.494 + x = dequeue();
1.495 + c = count.getAndDecrement();
1.496 + if (c > 1)
1.497 + notEmpty.signal();
1.498 + }
1.499 + } finally {
1.500 + takeLock.unlock();
1.501 + }
1.502 + if (c == capacity)
1.503 + signalNotFull();
1.504 + return x;
1.505 + }
1.506 +
1.507 + public E peek() {
1.508 + if (count.get() == 0)
1.509 + return null;
1.510 + final ReentrantLock takeLock = this.takeLock;
1.511 + takeLock.lock();
1.512 + try {
1.513 + Node<E> first = head.next;
1.514 + if (first == null)
1.515 + return null;
1.516 + else
1.517 + return first.item;
1.518 + } finally {
1.519 + takeLock.unlock();
1.520 + }
1.521 + }
1.522 +
1.523 + /**
1.524 + * Unlinks interior Node p with predecessor trail.
1.525 + */
1.526 + void unlink(Node<E> p, Node<E> trail) {
1.527 + // assert isFullyLocked();
1.528 + // p.next is not changed, to allow iterators that are
1.529 + // traversing p to maintain their weak-consistency guarantee.
1.530 + p.item = null;
1.531 + trail.next = p.next;
1.532 + if (last == p)
1.533 + last = trail;
1.534 + if (count.getAndDecrement() == capacity)
1.535 + notFull.signal();
1.536 + }
1.537 +
1.538 + /**
1.539 + * Removes a single instance of the specified element from this queue,
1.540 + * if it is present. More formally, removes an element {@code e} such
1.541 + * that {@code o.equals(e)}, if this queue contains one or more such
1.542 + * elements.
1.543 + * Returns {@code true} if this queue contained the specified element
1.544 + * (or equivalently, if this queue changed as a result of the call).
1.545 + *
1.546 + * @param o element to be removed from this queue, if present
1.547 + * @return {@code true} if this queue changed as a result of the call
1.548 + */
1.549 + public boolean remove(Object o) {
1.550 + if (o == null) return false;
1.551 + fullyLock();
1.552 + try {
1.553 + for (Node<E> trail = head, p = trail.next;
1.554 + p != null;
1.555 + trail = p, p = p.next) {
1.556 + if (o.equals(p.item)) {
1.557 + unlink(p, trail);
1.558 + return true;
1.559 + }
1.560 + }
1.561 + return false;
1.562 + } finally {
1.563 + fullyUnlock();
1.564 + }
1.565 + }
1.566 +
1.567 + /**
1.568 + * Returns {@code true} if this queue contains the specified element.
1.569 + * More formally, returns {@code true} if and only if this queue contains
1.570 + * at least one element {@code e} such that {@code o.equals(e)}.
1.571 + *
1.572 + * @param o object to be checked for containment in this queue
1.573 + * @return {@code true} if this queue contains the specified element
1.574 + */
1.575 + public boolean contains(Object o) {
1.576 + if (o == null) return false;
1.577 + fullyLock();
1.578 + try {
1.579 + for (Node<E> p = head.next; p != null; p = p.next)
1.580 + if (o.equals(p.item))
1.581 + return true;
1.582 + return false;
1.583 + } finally {
1.584 + fullyUnlock();
1.585 + }
1.586 + }
1.587 +
1.588 + /**
1.589 + * Returns an array containing all of the elements in this queue, in
1.590 + * proper sequence.
1.591 + *
1.592 + * <p>The returned array will be "safe" in that no references to it are
1.593 + * maintained by this queue. (In other words, this method must allocate
1.594 + * a new array). The caller is thus free to modify the returned array.
1.595 + *
1.596 + * <p>This method acts as bridge between array-based and collection-based
1.597 + * APIs.
1.598 + *
1.599 + * @return an array containing all of the elements in this queue
1.600 + */
1.601 + public Object[] toArray() {
1.602 + fullyLock();
1.603 + try {
1.604 + int size = count.get();
1.605 + Object[] a = new Object[size];
1.606 + int k = 0;
1.607 + for (Node<E> p = head.next; p != null; p = p.next)
1.608 + a[k++] = p.item;
1.609 + return a;
1.610 + } finally {
1.611 + fullyUnlock();
1.612 + }
1.613 + }
1.614 +
1.615 + /**
1.616 + * Returns an array containing all of the elements in this queue, in
1.617 + * proper sequence; the runtime type of the returned array is that of
1.618 + * the specified array. If the queue fits in the specified array, it
1.619 + * is returned therein. Otherwise, a new array is allocated with the
1.620 + * runtime type of the specified array and the size of this queue.
1.621 + *
1.622 + * <p>If this queue fits in the specified array with room to spare
1.623 + * (i.e., the array has more elements than this queue), the element in
1.624 + * the array immediately following the end of the queue is set to
1.625 + * {@code null}.
1.626 + *
1.627 + * <p>Like the {@link #toArray()} method, this method acts as bridge between
1.628 + * array-based and collection-based APIs. Further, this method allows
1.629 + * precise control over the runtime type of the output array, and may,
1.630 + * under certain circumstances, be used to save allocation costs.
1.631 + *
1.632 + * <p>Suppose {@code x} is a queue known to contain only strings.
1.633 + * The following code can be used to dump the queue into a newly
1.634 + * allocated array of {@code String}:
1.635 + *
1.636 + * <pre>
1.637 + * String[] y = x.toArray(new String[0]);</pre>
1.638 + *
1.639 + * Note that {@code toArray(new Object[0])} is identical in function to
1.640 + * {@code toArray()}.
1.641 + *
1.642 + * @param a the array into which the elements of the queue are to
1.643 + * be stored, if it is big enough; otherwise, a new array of the
1.644 + * same runtime type is allocated for this purpose
1.645 + * @return an array containing all of the elements in this queue
1.646 + * @throws ArrayStoreException if the runtime type of the specified array
1.647 + * is not a supertype of the runtime type of every element in
1.648 + * this queue
1.649 + * @throws NullPointerException if the specified array is null
1.650 + */
1.651 + @SuppressWarnings("unchecked")
1.652 + public <T> T[] toArray(T[] a) {
1.653 + fullyLock();
1.654 + try {
1.655 + int size = count.get();
1.656 + if (a.length < size)
1.657 + a = (T[])java.lang.reflect.Array.newInstance
1.658 + (a.getClass().getComponentType(), size);
1.659 +
1.660 + int k = 0;
1.661 + for (Node<E> p = head.next; p != null; p = p.next)
1.662 + a[k++] = (T)p.item;
1.663 + if (a.length > k)
1.664 + a[k] = null;
1.665 + return a;
1.666 + } finally {
1.667 + fullyUnlock();
1.668 + }
1.669 + }
1.670 +
1.671 + public String toString() {
1.672 + fullyLock();
1.673 + try {
1.674 + Node<E> p = head.next;
1.675 + if (p == null)
1.676 + return "[]";
1.677 +
1.678 + StringBuilder sb = new StringBuilder();
1.679 + sb.append('[');
1.680 + for (;;) {
1.681 + E e = p.item;
1.682 + sb.append(e == this ? "(this Collection)" : e);
1.683 + p = p.next;
1.684 + if (p == null)
1.685 + return sb.append(']').toString();
1.686 + sb.append(',').append(' ');
1.687 + }
1.688 + } finally {
1.689 + fullyUnlock();
1.690 + }
1.691 + }
1.692 +
1.693 + /**
1.694 + * Atomically removes all of the elements from this queue.
1.695 + * The queue will be empty after this call returns.
1.696 + */
1.697 + public void clear() {
1.698 + fullyLock();
1.699 + try {
1.700 + for (Node<E> p, h = head; (p = h.next) != null; h = p) {
1.701 + h.next = h;
1.702 + p.item = null;
1.703 + }
1.704 + head = last;
1.705 + // assert head.item == null && head.next == null;
1.706 + if (count.getAndSet(0) == capacity)
1.707 + notFull.signal();
1.708 + } finally {
1.709 + fullyUnlock();
1.710 + }
1.711 + }
1.712 +
1.713 + /**
1.714 + * @throws UnsupportedOperationException {@inheritDoc}
1.715 + * @throws ClassCastException {@inheritDoc}
1.716 + * @throws NullPointerException {@inheritDoc}
1.717 + * @throws IllegalArgumentException {@inheritDoc}
1.718 + */
1.719 + public int drainTo(Collection<? super E> c) {
1.720 + return drainTo(c, Integer.MAX_VALUE);
1.721 + }
1.722 +
1.723 + /**
1.724 + * @throws UnsupportedOperationException {@inheritDoc}
1.725 + * @throws ClassCastException {@inheritDoc}
1.726 + * @throws NullPointerException {@inheritDoc}
1.727 + * @throws IllegalArgumentException {@inheritDoc}
1.728 + */
1.729 + public int drainTo(Collection<? super E> c, int maxElements) {
1.730 + if (c == null)
1.731 + throw new NullPointerException();
1.732 + if (c == this)
1.733 + throw new IllegalArgumentException();
1.734 + boolean signalNotFull = false;
1.735 + final ReentrantLock takeLock = this.takeLock;
1.736 + takeLock.lock();
1.737 + try {
1.738 + int n = Math.min(maxElements, count.get());
1.739 + // count.get provides visibility to first n Nodes
1.740 + Node<E> h = head;
1.741 + int i = 0;
1.742 + try {
1.743 + while (i < n) {
1.744 + Node<E> p = h.next;
1.745 + c.add(p.item);
1.746 + p.item = null;
1.747 + h.next = h;
1.748 + h = p;
1.749 + ++i;
1.750 + }
1.751 + return n;
1.752 + } finally {
1.753 + // Restore invariants even if c.add() threw
1.754 + if (i > 0) {
1.755 + // assert h.item == null;
1.756 + head = h;
1.757 + signalNotFull = (count.getAndAdd(-i) == capacity);
1.758 + }
1.759 + }
1.760 + } finally {
1.761 + takeLock.unlock();
1.762 + if (signalNotFull)
1.763 + signalNotFull();
1.764 + }
1.765 + }
1.766 +
1.767 + /**
1.768 + * Returns an iterator over the elements in this queue in proper sequence.
1.769 + * The elements will be returned in order from first (head) to last (tail).
1.770 + *
1.771 + * <p>The returned iterator is a "weakly consistent" iterator that
1.772 + * will never throw {@link java.util.ConcurrentModificationException
1.773 + * ConcurrentModificationException}, and guarantees to traverse
1.774 + * elements as they existed upon construction of the iterator, and
1.775 + * may (but is not guaranteed to) reflect any modifications
1.776 + * subsequent to construction.
1.777 + *
1.778 + * @return an iterator over the elements in this queue in proper sequence
1.779 + */
1.780 + public Iterator<E> iterator() {
1.781 + return new Itr();
1.782 + }
1.783 +
1.784 + private class Itr implements Iterator<E> {
1.785 + /*
1.786 + * Basic weakly-consistent iterator. At all times hold the next
1.787 + * item to hand out so that if hasNext() reports true, we will
1.788 + * still have it to return even if lost race with a take etc.
1.789 + */
1.790 + private Node<E> current;
1.791 + private Node<E> lastRet;
1.792 + private E currentElement;
1.793 +
1.794 + Itr() {
1.795 + fullyLock();
1.796 + try {
1.797 + current = head.next;
1.798 + if (current != null)
1.799 + currentElement = current.item;
1.800 + } finally {
1.801 + fullyUnlock();
1.802 + }
1.803 + }
1.804 +
1.805 + public boolean hasNext() {
1.806 + return current != null;
1.807 + }
1.808 +
1.809 + /**
1.810 + * Returns the next live successor of p, or null if no such.
1.811 + *
1.812 + * Unlike other traversal methods, iterators need to handle both:
1.813 + * - dequeued nodes (p.next == p)
1.814 + * - (possibly multiple) interior removed nodes (p.item == null)
1.815 + */
1.816 + private Node<E> nextNode(Node<E> p) {
1.817 + for (;;) {
1.818 + Node<E> s = p.next;
1.819 + if (s == p)
1.820 + return head.next;
1.821 + if (s == null || s.item != null)
1.822 + return s;
1.823 + p = s;
1.824 + }
1.825 + }
1.826 +
1.827 + public E next() {
1.828 + fullyLock();
1.829 + try {
1.830 + if (current == null)
1.831 + throw new NoSuchElementException();
1.832 + E x = currentElement;
1.833 + lastRet = current;
1.834 + current = nextNode(current);
1.835 + currentElement = (current == null) ? null : current.item;
1.836 + return x;
1.837 + } finally {
1.838 + fullyUnlock();
1.839 + }
1.840 + }
1.841 +
1.842 + public void remove() {
1.843 + if (lastRet == null)
1.844 + throw new IllegalStateException();
1.845 + fullyLock();
1.846 + try {
1.847 + Node<E> node = lastRet;
1.848 + lastRet = null;
1.849 + for (Node<E> trail = head, p = trail.next;
1.850 + p != null;
1.851 + trail = p, p = p.next) {
1.852 + if (p == node) {
1.853 + unlink(p, trail);
1.854 + break;
1.855 + }
1.856 + }
1.857 + } finally {
1.858 + fullyUnlock();
1.859 + }
1.860 + }
1.861 + }
1.862 +
1.863 + /**
1.864 + * Save the state to a stream (that is, serialize it).
1.865 + *
1.866 + * @serialData The capacity is emitted (int), followed by all of
1.867 + * its elements (each an {@code Object}) in the proper order,
1.868 + * followed by a null
1.869 + * @param s the stream
1.870 + */
1.871 + private void writeObject(java.io.ObjectOutputStream s)
1.872 + throws java.io.IOException {
1.873 +
1.874 + fullyLock();
1.875 + try {
1.876 + // Write out any hidden stuff, plus capacity
1.877 + s.defaultWriteObject();
1.878 +
1.879 + // Write out all elements in the proper order.
1.880 + for (Node<E> p = head.next; p != null; p = p.next)
1.881 + s.writeObject(p.item);
1.882 +
1.883 + // Use trailing null as sentinel
1.884 + s.writeObject(null);
1.885 + } finally {
1.886 + fullyUnlock();
1.887 + }
1.888 + }
1.889 +
1.890 + /**
1.891 + * Reconstitute this queue instance from a stream (that is,
1.892 + * deserialize it).
1.893 + *
1.894 + * @param s the stream
1.895 + */
1.896 + private void readObject(java.io.ObjectInputStream s)
1.897 + throws java.io.IOException, ClassNotFoundException {
1.898 + // Read in capacity, and any hidden stuff
1.899 + s.defaultReadObject();
1.900 +
1.901 + count.set(0);
1.902 + last = head = new Node<E>(null);
1.903 +
1.904 + // Read in all elements and place in queue
1.905 + for (;;) {
1.906 + @SuppressWarnings("unchecked")
1.907 + E item = (E)s.readObject();
1.908 + if (item == null)
1.909 + break;
1.910 + add(item);
1.911 + }
1.912 + }
1.913 +}