diff -r 000000000000 -r 212417b74b72 rt/emul/compact/src/main/java/java/util/concurrent/CyclicBarrier.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/CyclicBarrier.java Sat Mar 19 10:46:31 2016 +0100 @@ -0,0 +1,483 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; +import java.util.concurrent.locks.*; + +/** + * A synchronization aid that allows a set of threads to all wait for + * each other to reach a common barrier point. CyclicBarriers are + * useful in programs involving a fixed sized party of threads that + * must occasionally wait for each other. The barrier is called + * cyclic because it can be re-used after the waiting threads + * are released. + * + *

A CyclicBarrier supports an optional {@link Runnable} command + * that is run once per barrier point, after the last thread in the party + * arrives, but before any threads are released. + * This barrier action is useful + * for updating shared-state before any of the parties continue. + * + *

Sample usage: Here is an example of + * using a barrier in a parallel decomposition design: + *

+ * class Solver {
+ *   final int N;
+ *   final float[][] data;
+ *   final CyclicBarrier barrier;
+ *
+ *   class Worker implements Runnable {
+ *     int myRow;
+ *     Worker(int row) { myRow = row; }
+ *     public void run() {
+ *       while (!done()) {
+ *         processRow(myRow);
+ *
+ *         try {
+ *           barrier.await();
+ *         } catch (InterruptedException ex) {
+ *           return;
+ *         } catch (BrokenBarrierException ex) {
+ *           return;
+ *         }
+ *       }
+ *     }
+ *   }
+ *
+ *   public Solver(float[][] matrix) {
+ *     data = matrix;
+ *     N = matrix.length;
+ *     barrier = new CyclicBarrier(N,
+ *                                 new Runnable() {
+ *                                   public void run() {
+ *                                     mergeRows(...);
+ *                                   }
+ *                                 });
+ *     for (int i = 0; i < N; ++i)
+ *       new Thread(new Worker(i)).start();
+ *
+ *     waitUntilDone();
+ *   }
+ * }
+ * 
+ * Here, each worker thread processes a row of the matrix then waits at the + * barrier until all rows have been processed. When all rows are processed + * the supplied {@link Runnable} barrier action is executed and merges the + * rows. If the merger + * determines that a solution has been found then done() will return + * true and each worker will terminate. + * + *

If the barrier action does not rely on the parties being suspended when + * it is executed, then any of the threads in the party could execute that + * action when it is released. To facilitate this, each invocation of + * {@link #await} returns the arrival index of that thread at the barrier. + * You can then choose which thread should execute the barrier action, for + * example: + *

  if (barrier.await() == 0) {
+ *     // log the completion of this iteration
+ *   }
+ * + *

The CyclicBarrier uses an all-or-none breakage model + * for failed synchronization attempts: If a thread leaves a barrier + * point prematurely because of interruption, failure, or timeout, all + * other threads waiting at that barrier point will also leave + * abnormally via {@link BrokenBarrierException} (or + * {@link InterruptedException} if they too were interrupted at about + * the same time). + * + *

Memory consistency effects: Actions in a thread prior to calling + * {@code await()} + * happen-before + * actions that are part of the barrier action, which in turn + * happen-before actions following a successful return from the + * corresponding {@code await()} in other threads. + * + * @since 1.5 + * @see CountDownLatch + * + * @author Doug Lea + */ +public class CyclicBarrier { + /** + * Each use of the barrier is represented as a generation instance. + * The generation changes whenever the barrier is tripped, or + * is reset. There can be many generations associated with threads + * using the barrier - due to the non-deterministic way the lock + * may be allocated to waiting threads - but only one of these + * can be active at a time (the one to which count applies) + * and all the rest are either broken or tripped. + * There need not be an active generation if there has been a break + * but no subsequent reset. + */ + private static class Generation { + boolean broken = false; + } + + /** The lock for guarding barrier entry */ + private final ReentrantLock lock = new ReentrantLock(); + /** Condition to wait on until tripped */ + private final Condition trip = lock.newCondition(); + /** The number of parties */ + private final int parties; + /* The command to run when tripped */ + private final Runnable barrierCommand; + /** The current generation */ + private Generation generation = new Generation(); + + /** + * Number of parties still waiting. Counts down from parties to 0 + * on each generation. It is reset to parties on each new + * generation or when broken. + */ + private int count; + + /** + * Updates state on barrier trip and wakes up everyone. + * Called only while holding lock. + */ + private void nextGeneration() { + // signal completion of last generation + trip.signalAll(); + // set up next generation + count = parties; + generation = new Generation(); + } + + /** + * Sets current barrier generation as broken and wakes up everyone. + * Called only while holding lock. + */ + private void breakBarrier() { + generation.broken = true; + count = parties; + trip.signalAll(); + } + + /** + * Main barrier code, covering the various policies. + */ + private int dowait(boolean timed, long nanos) + throws InterruptedException, BrokenBarrierException, + TimeoutException { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + final Generation g = generation; + + if (g.broken) + throw new BrokenBarrierException(); + + if (Thread.interrupted()) { + breakBarrier(); + throw new InterruptedException(); + } + + int index = --count; + if (index == 0) { // tripped + boolean ranAction = false; + try { + final Runnable command = barrierCommand; + if (command != null) + command.run(); + ranAction = true; + nextGeneration(); + return 0; + } finally { + if (!ranAction) + breakBarrier(); + } + } + + // loop until tripped, broken, interrupted, or timed out + for (;;) { + try { + if (!timed) + trip.await(); + else if (nanos > 0L) + nanos = trip.awaitNanos(nanos); + } catch (InterruptedException ie) { + if (g == generation && ! g.broken) { + breakBarrier(); + throw ie; + } else { + // We're about to finish waiting even if we had not + // been interrupted, so this interrupt is deemed to + // "belong" to subsequent execution. + Thread.currentThread().interrupt(); + } + } + + if (g.broken) + throw new BrokenBarrierException(); + + if (g != generation) + return index; + + if (timed && nanos <= 0L) { + breakBarrier(); + throw new TimeoutException(); + } + } + } finally { + lock.unlock(); + } + } + + /** + * Creates a new CyclicBarrier that will trip when the + * given number of parties (threads) are waiting upon it, and which + * will execute the given barrier action when the barrier is tripped, + * performed by the last thread entering the barrier. + * + * @param parties the number of threads that must invoke {@link #await} + * before the barrier is tripped + * @param barrierAction the command to execute when the barrier is + * tripped, or {@code null} if there is no action + * @throws IllegalArgumentException if {@code parties} is less than 1 + */ + public CyclicBarrier(int parties, Runnable barrierAction) { + if (parties <= 0) throw new IllegalArgumentException(); + this.parties = parties; + this.count = parties; + this.barrierCommand = barrierAction; + } + + /** + * Creates a new CyclicBarrier that will trip when the + * given number of parties (threads) are waiting upon it, and + * does not perform a predefined action when the barrier is tripped. + * + * @param parties the number of threads that must invoke {@link #await} + * before the barrier is tripped + * @throws IllegalArgumentException if {@code parties} is less than 1 + */ + public CyclicBarrier(int parties) { + this(parties, null); + } + + /** + * Returns the number of parties required to trip this barrier. + * + * @return the number of parties required to trip this barrier + */ + public int getParties() { + return parties; + } + + /** + * Waits until all {@linkplain #getParties parties} have invoked + * await on this barrier. + * + *

If the current thread is not the last to arrive then it is + * disabled for thread scheduling purposes and lies dormant until + * one of the following things happens: + *

+ * + *

If the current thread: + *

+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the barrier is {@link #reset} while any thread is waiting, + * or if the barrier {@linkplain #isBroken is broken} when + * await is invoked, or while any thread is waiting, then + * {@link BrokenBarrierException} is thrown. + * + *

If any thread is {@linkplain Thread#interrupt interrupted} while waiting, + * then all other waiting threads will throw + * {@link BrokenBarrierException} and the barrier is placed in the broken + * state. + * + *

If the current thread is the last thread to arrive, and a + * non-null barrier action was supplied in the constructor, then the + * current thread runs the action before allowing the other threads to + * continue. + * If an exception occurs during the barrier action then that exception + * will be propagated in the current thread and the barrier is placed in + * the broken state. + * + * @return the arrival index of the current thread, where index + * {@link #getParties()} - 1 indicates the first + * to arrive and zero indicates the last to arrive + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws BrokenBarrierException if another thread was + * interrupted or timed out while the current thread was + * waiting, or the barrier was reset, or the barrier was + * broken when {@code await} was called, or the barrier + * action (if present) failed due an exception. + */ + public int await() throws InterruptedException, BrokenBarrierException { + try { + return dowait(false, 0L); + } catch (TimeoutException toe) { + throw new Error(toe); // cannot happen; + } + } + + /** + * Waits until all {@linkplain #getParties parties} have invoked + * await on this barrier, or the specified waiting time elapses. + * + *

If the current thread is not the last to arrive then it is + * disabled for thread scheduling purposes and lies dormant until + * one of the following things happens: + *

+ * + *

If the current thread: + *

+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the specified waiting time elapses then {@link TimeoutException} + * is thrown. If the time is less than or equal to zero, the + * method will not wait at all. + * + *

If the barrier is {@link #reset} while any thread is waiting, + * or if the barrier {@linkplain #isBroken is broken} when + * await is invoked, or while any thread is waiting, then + * {@link BrokenBarrierException} is thrown. + * + *

If any thread is {@linkplain Thread#interrupt interrupted} while + * waiting, then all other waiting threads will throw {@link + * BrokenBarrierException} and the barrier is placed in the broken + * state. + * + *

If the current thread is the last thread to arrive, and a + * non-null barrier action was supplied in the constructor, then the + * current thread runs the action before allowing the other threads to + * continue. + * If an exception occurs during the barrier action then that exception + * will be propagated in the current thread and the barrier is placed in + * the broken state. + * + * @param timeout the time to wait for the barrier + * @param unit the time unit of the timeout parameter + * @return the arrival index of the current thread, where index + * {@link #getParties()} - 1 indicates the first + * to arrive and zero indicates the last to arrive + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the specified timeout elapses + * @throws BrokenBarrierException if another thread was + * interrupted or timed out while the current thread was + * waiting, or the barrier was reset, or the barrier was broken + * when {@code await} was called, or the barrier action (if + * present) failed due an exception + */ + public int await(long timeout, TimeUnit unit) + throws InterruptedException, + BrokenBarrierException, + TimeoutException { + return dowait(true, unit.toNanos(timeout)); + } + + /** + * Queries if this barrier is in a broken state. + * + * @return {@code true} if one or more parties broke out of this + * barrier due to interruption or timeout since + * construction or the last reset, or a barrier action + * failed due to an exception; {@code false} otherwise. + */ + public boolean isBroken() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return generation.broken; + } finally { + lock.unlock(); + } + } + + /** + * Resets the barrier to its initial state. If any parties are + * currently waiting at the barrier, they will return with a + * {@link BrokenBarrierException}. Note that resets after + * a breakage has occurred for other reasons can be complicated to + * carry out; threads need to re-synchronize in some other way, + * and choose one to perform the reset. It may be preferable to + * instead create a new barrier for subsequent use. + */ + public void reset() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + breakBarrier(); // break the current generation + nextGeneration(); // start a new generation + } finally { + lock.unlock(); + } + } + + /** + * Returns the number of parties currently waiting at the barrier. + * This method is primarily useful for debugging and assertions. + * + * @return the number of parties currently blocked in {@link #await} + */ + public int getNumberWaiting() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return parties - count; + } finally { + lock.unlock(); + } + } +}