1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/rt/emul/compact/src/main/java/java/util/concurrent/CyclicBarrier.java Sat Mar 19 10:46:31 2016 +0100
1.3 @@ -0,0 +1,483 @@
1.4 +/*
1.5 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1.6 + *
1.7 + * This code is free software; you can redistribute it and/or modify it
1.8 + * under the terms of the GNU General Public License version 2 only, as
1.9 + * published by the Free Software Foundation. Oracle designates this
1.10 + * particular file as subject to the "Classpath" exception as provided
1.11 + * by Oracle in the LICENSE file that accompanied this code.
1.12 + *
1.13 + * This code is distributed in the hope that it will be useful, but WITHOUT
1.14 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1.15 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1.16 + * version 2 for more details (a copy is included in the LICENSE file that
1.17 + * accompanied this code).
1.18 + *
1.19 + * You should have received a copy of the GNU General Public License version
1.20 + * 2 along with this work; if not, write to the Free Software Foundation,
1.21 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1.22 + *
1.23 + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
1.24 + * or visit www.oracle.com if you need additional information or have any
1.25 + * questions.
1.26 + */
1.27 +
1.28 +/*
1.29 + * This file is available under and governed by the GNU General Public
1.30 + * License version 2 only, as published by the Free Software Foundation.
1.31 + * However, the following notice accompanied the original version of this
1.32 + * file:
1.33 + *
1.34 + * Written by Doug Lea with assistance from members of JCP JSR-166
1.35 + * Expert Group and released to the public domain, as explained at
1.36 + * http://creativecommons.org/publicdomain/zero/1.0/
1.37 + */
1.38 +
1.39 +package java.util.concurrent;
1.40 +import java.util.concurrent.locks.*;
1.41 +
1.42 +/**
1.43 + * A synchronization aid that allows a set of threads to all wait for
1.44 + * each other to reach a common barrier point. CyclicBarriers are
1.45 + * useful in programs involving a fixed sized party of threads that
1.46 + * must occasionally wait for each other. The barrier is called
1.47 + * <em>cyclic</em> because it can be re-used after the waiting threads
1.48 + * are released.
1.49 + *
1.50 + * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
1.51 + * that is run once per barrier point, after the last thread in the party
1.52 + * arrives, but before any threads are released.
1.53 + * This <em>barrier action</em> is useful
1.54 + * for updating shared-state before any of the parties continue.
1.55 + *
1.56 + * <p><b>Sample usage:</b> Here is an example of
1.57 + * using a barrier in a parallel decomposition design:
1.58 + * <pre>
1.59 + * class Solver {
1.60 + * final int N;
1.61 + * final float[][] data;
1.62 + * final CyclicBarrier barrier;
1.63 + *
1.64 + * class Worker implements Runnable {
1.65 + * int myRow;
1.66 + * Worker(int row) { myRow = row; }
1.67 + * public void run() {
1.68 + * while (!done()) {
1.69 + * processRow(myRow);
1.70 + *
1.71 + * try {
1.72 + * barrier.await();
1.73 + * } catch (InterruptedException ex) {
1.74 + * return;
1.75 + * } catch (BrokenBarrierException ex) {
1.76 + * return;
1.77 + * }
1.78 + * }
1.79 + * }
1.80 + * }
1.81 + *
1.82 + * public Solver(float[][] matrix) {
1.83 + * data = matrix;
1.84 + * N = matrix.length;
1.85 + * barrier = new CyclicBarrier(N,
1.86 + * new Runnable() {
1.87 + * public void run() {
1.88 + * mergeRows(...);
1.89 + * }
1.90 + * });
1.91 + * for (int i = 0; i < N; ++i)
1.92 + * new Thread(new Worker(i)).start();
1.93 + *
1.94 + * waitUntilDone();
1.95 + * }
1.96 + * }
1.97 + * </pre>
1.98 + * Here, each worker thread processes a row of the matrix then waits at the
1.99 + * barrier until all rows have been processed. When all rows are processed
1.100 + * the supplied {@link Runnable} barrier action is executed and merges the
1.101 + * rows. If the merger
1.102 + * determines that a solution has been found then <tt>done()</tt> will return
1.103 + * <tt>true</tt> and each worker will terminate.
1.104 + *
1.105 + * <p>If the barrier action does not rely on the parties being suspended when
1.106 + * it is executed, then any of the threads in the party could execute that
1.107 + * action when it is released. To facilitate this, each invocation of
1.108 + * {@link #await} returns the arrival index of that thread at the barrier.
1.109 + * You can then choose which thread should execute the barrier action, for
1.110 + * example:
1.111 + * <pre> if (barrier.await() == 0) {
1.112 + * // log the completion of this iteration
1.113 + * }</pre>
1.114 + *
1.115 + * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
1.116 + * for failed synchronization attempts: If a thread leaves a barrier
1.117 + * point prematurely because of interruption, failure, or timeout, all
1.118 + * other threads waiting at that barrier point will also leave
1.119 + * abnormally via {@link BrokenBarrierException} (or
1.120 + * {@link InterruptedException} if they too were interrupted at about
1.121 + * the same time).
1.122 + *
1.123 + * <p>Memory consistency effects: Actions in a thread prior to calling
1.124 + * {@code await()}
1.125 + * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
1.126 + * actions that are part of the barrier action, which in turn
1.127 + * <i>happen-before</i> actions following a successful return from the
1.128 + * corresponding {@code await()} in other threads.
1.129 + *
1.130 + * @since 1.5
1.131 + * @see CountDownLatch
1.132 + *
1.133 + * @author Doug Lea
1.134 + */
1.135 +public class CyclicBarrier {
1.136 + /**
1.137 + * Each use of the barrier is represented as a generation instance.
1.138 + * The generation changes whenever the barrier is tripped, or
1.139 + * is reset. There can be many generations associated with threads
1.140 + * using the barrier - due to the non-deterministic way the lock
1.141 + * may be allocated to waiting threads - but only one of these
1.142 + * can be active at a time (the one to which <tt>count</tt> applies)
1.143 + * and all the rest are either broken or tripped.
1.144 + * There need not be an active generation if there has been a break
1.145 + * but no subsequent reset.
1.146 + */
1.147 + private static class Generation {
1.148 + boolean broken = false;
1.149 + }
1.150 +
1.151 + /** The lock for guarding barrier entry */
1.152 + private final ReentrantLock lock = new ReentrantLock();
1.153 + /** Condition to wait on until tripped */
1.154 + private final Condition trip = lock.newCondition();
1.155 + /** The number of parties */
1.156 + private final int parties;
1.157 + /* The command to run when tripped */
1.158 + private final Runnable barrierCommand;
1.159 + /** The current generation */
1.160 + private Generation generation = new Generation();
1.161 +
1.162 + /**
1.163 + * Number of parties still waiting. Counts down from parties to 0
1.164 + * on each generation. It is reset to parties on each new
1.165 + * generation or when broken.
1.166 + */
1.167 + private int count;
1.168 +
1.169 + /**
1.170 + * Updates state on barrier trip and wakes up everyone.
1.171 + * Called only while holding lock.
1.172 + */
1.173 + private void nextGeneration() {
1.174 + // signal completion of last generation
1.175 + trip.signalAll();
1.176 + // set up next generation
1.177 + count = parties;
1.178 + generation = new Generation();
1.179 + }
1.180 +
1.181 + /**
1.182 + * Sets current barrier generation as broken and wakes up everyone.
1.183 + * Called only while holding lock.
1.184 + */
1.185 + private void breakBarrier() {
1.186 + generation.broken = true;
1.187 + count = parties;
1.188 + trip.signalAll();
1.189 + }
1.190 +
1.191 + /**
1.192 + * Main barrier code, covering the various policies.
1.193 + */
1.194 + private int dowait(boolean timed, long nanos)
1.195 + throws InterruptedException, BrokenBarrierException,
1.196 + TimeoutException {
1.197 + final ReentrantLock lock = this.lock;
1.198 + lock.lock();
1.199 + try {
1.200 + final Generation g = generation;
1.201 +
1.202 + if (g.broken)
1.203 + throw new BrokenBarrierException();
1.204 +
1.205 + if (Thread.interrupted()) {
1.206 + breakBarrier();
1.207 + throw new InterruptedException();
1.208 + }
1.209 +
1.210 + int index = --count;
1.211 + if (index == 0) { // tripped
1.212 + boolean ranAction = false;
1.213 + try {
1.214 + final Runnable command = barrierCommand;
1.215 + if (command != null)
1.216 + command.run();
1.217 + ranAction = true;
1.218 + nextGeneration();
1.219 + return 0;
1.220 + } finally {
1.221 + if (!ranAction)
1.222 + breakBarrier();
1.223 + }
1.224 + }
1.225 +
1.226 + // loop until tripped, broken, interrupted, or timed out
1.227 + for (;;) {
1.228 + try {
1.229 + if (!timed)
1.230 + trip.await();
1.231 + else if (nanos > 0L)
1.232 + nanos = trip.awaitNanos(nanos);
1.233 + } catch (InterruptedException ie) {
1.234 + if (g == generation && ! g.broken) {
1.235 + breakBarrier();
1.236 + throw ie;
1.237 + } else {
1.238 + // We're about to finish waiting even if we had not
1.239 + // been interrupted, so this interrupt is deemed to
1.240 + // "belong" to subsequent execution.
1.241 + Thread.currentThread().interrupt();
1.242 + }
1.243 + }
1.244 +
1.245 + if (g.broken)
1.246 + throw new BrokenBarrierException();
1.247 +
1.248 + if (g != generation)
1.249 + return index;
1.250 +
1.251 + if (timed && nanos <= 0L) {
1.252 + breakBarrier();
1.253 + throw new TimeoutException();
1.254 + }
1.255 + }
1.256 + } finally {
1.257 + lock.unlock();
1.258 + }
1.259 + }
1.260 +
1.261 + /**
1.262 + * Creates a new <tt>CyclicBarrier</tt> that will trip when the
1.263 + * given number of parties (threads) are waiting upon it, and which
1.264 + * will execute the given barrier action when the barrier is tripped,
1.265 + * performed by the last thread entering the barrier.
1.266 + *
1.267 + * @param parties the number of threads that must invoke {@link #await}
1.268 + * before the barrier is tripped
1.269 + * @param barrierAction the command to execute when the barrier is
1.270 + * tripped, or {@code null} if there is no action
1.271 + * @throws IllegalArgumentException if {@code parties} is less than 1
1.272 + */
1.273 + public CyclicBarrier(int parties, Runnable barrierAction) {
1.274 + if (parties <= 0) throw new IllegalArgumentException();
1.275 + this.parties = parties;
1.276 + this.count = parties;
1.277 + this.barrierCommand = barrierAction;
1.278 + }
1.279 +
1.280 + /**
1.281 + * Creates a new <tt>CyclicBarrier</tt> that will trip when the
1.282 + * given number of parties (threads) are waiting upon it, and
1.283 + * does not perform a predefined action when the barrier is tripped.
1.284 + *
1.285 + * @param parties the number of threads that must invoke {@link #await}
1.286 + * before the barrier is tripped
1.287 + * @throws IllegalArgumentException if {@code parties} is less than 1
1.288 + */
1.289 + public CyclicBarrier(int parties) {
1.290 + this(parties, null);
1.291 + }
1.292 +
1.293 + /**
1.294 + * Returns the number of parties required to trip this barrier.
1.295 + *
1.296 + * @return the number of parties required to trip this barrier
1.297 + */
1.298 + public int getParties() {
1.299 + return parties;
1.300 + }
1.301 +
1.302 + /**
1.303 + * Waits until all {@linkplain #getParties parties} have invoked
1.304 + * <tt>await</tt> on this barrier.
1.305 + *
1.306 + * <p>If the current thread is not the last to arrive then it is
1.307 + * disabled for thread scheduling purposes and lies dormant until
1.308 + * one of the following things happens:
1.309 + * <ul>
1.310 + * <li>The last thread arrives; or
1.311 + * <li>Some other thread {@linkplain Thread#interrupt interrupts}
1.312 + * the current thread; or
1.313 + * <li>Some other thread {@linkplain Thread#interrupt interrupts}
1.314 + * one of the other waiting threads; or
1.315 + * <li>Some other thread times out while waiting for barrier; or
1.316 + * <li>Some other thread invokes {@link #reset} on this barrier.
1.317 + * </ul>
1.318 + *
1.319 + * <p>If the current thread:
1.320 + * <ul>
1.321 + * <li>has its interrupted status set on entry to this method; or
1.322 + * <li>is {@linkplain Thread#interrupt interrupted} while waiting
1.323 + * </ul>
1.324 + * then {@link InterruptedException} is thrown and the current thread's
1.325 + * interrupted status is cleared.
1.326 + *
1.327 + * <p>If the barrier is {@link #reset} while any thread is waiting,
1.328 + * or if the barrier {@linkplain #isBroken is broken} when
1.329 + * <tt>await</tt> is invoked, or while any thread is waiting, then
1.330 + * {@link BrokenBarrierException} is thrown.
1.331 + *
1.332 + * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
1.333 + * then all other waiting threads will throw
1.334 + * {@link BrokenBarrierException} and the barrier is placed in the broken
1.335 + * state.
1.336 + *
1.337 + * <p>If the current thread is the last thread to arrive, and a
1.338 + * non-null barrier action was supplied in the constructor, then the
1.339 + * current thread runs the action before allowing the other threads to
1.340 + * continue.
1.341 + * If an exception occurs during the barrier action then that exception
1.342 + * will be propagated in the current thread and the barrier is placed in
1.343 + * the broken state.
1.344 + *
1.345 + * @return the arrival index of the current thread, where index
1.346 + * <tt>{@link #getParties()} - 1</tt> indicates the first
1.347 + * to arrive and zero indicates the last to arrive
1.348 + * @throws InterruptedException if the current thread was interrupted
1.349 + * while waiting
1.350 + * @throws BrokenBarrierException if <em>another</em> thread was
1.351 + * interrupted or timed out while the current thread was
1.352 + * waiting, or the barrier was reset, or the barrier was
1.353 + * broken when {@code await} was called, or the barrier
1.354 + * action (if present) failed due an exception.
1.355 + */
1.356 + public int await() throws InterruptedException, BrokenBarrierException {
1.357 + try {
1.358 + return dowait(false, 0L);
1.359 + } catch (TimeoutException toe) {
1.360 + throw new Error(toe); // cannot happen;
1.361 + }
1.362 + }
1.363 +
1.364 + /**
1.365 + * Waits until all {@linkplain #getParties parties} have invoked
1.366 + * <tt>await</tt> on this barrier, or the specified waiting time elapses.
1.367 + *
1.368 + * <p>If the current thread is not the last to arrive then it is
1.369 + * disabled for thread scheduling purposes and lies dormant until
1.370 + * one of the following things happens:
1.371 + * <ul>
1.372 + * <li>The last thread arrives; or
1.373 + * <li>The specified timeout elapses; or
1.374 + * <li>Some other thread {@linkplain Thread#interrupt interrupts}
1.375 + * the current thread; or
1.376 + * <li>Some other thread {@linkplain Thread#interrupt interrupts}
1.377 + * one of the other waiting threads; or
1.378 + * <li>Some other thread times out while waiting for barrier; or
1.379 + * <li>Some other thread invokes {@link #reset} on this barrier.
1.380 + * </ul>
1.381 + *
1.382 + * <p>If the current thread:
1.383 + * <ul>
1.384 + * <li>has its interrupted status set on entry to this method; or
1.385 + * <li>is {@linkplain Thread#interrupt interrupted} while waiting
1.386 + * </ul>
1.387 + * then {@link InterruptedException} is thrown and the current thread's
1.388 + * interrupted status is cleared.
1.389 + *
1.390 + * <p>If the specified waiting time elapses then {@link TimeoutException}
1.391 + * is thrown. If the time is less than or equal to zero, the
1.392 + * method will not wait at all.
1.393 + *
1.394 + * <p>If the barrier is {@link #reset} while any thread is waiting,
1.395 + * or if the barrier {@linkplain #isBroken is broken} when
1.396 + * <tt>await</tt> is invoked, or while any thread is waiting, then
1.397 + * {@link BrokenBarrierException} is thrown.
1.398 + *
1.399 + * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
1.400 + * waiting, then all other waiting threads will throw {@link
1.401 + * BrokenBarrierException} and the barrier is placed in the broken
1.402 + * state.
1.403 + *
1.404 + * <p>If the current thread is the last thread to arrive, and a
1.405 + * non-null barrier action was supplied in the constructor, then the
1.406 + * current thread runs the action before allowing the other threads to
1.407 + * continue.
1.408 + * If an exception occurs during the barrier action then that exception
1.409 + * will be propagated in the current thread and the barrier is placed in
1.410 + * the broken state.
1.411 + *
1.412 + * @param timeout the time to wait for the barrier
1.413 + * @param unit the time unit of the timeout parameter
1.414 + * @return the arrival index of the current thread, where index
1.415 + * <tt>{@link #getParties()} - 1</tt> indicates the first
1.416 + * to arrive and zero indicates the last to arrive
1.417 + * @throws InterruptedException if the current thread was interrupted
1.418 + * while waiting
1.419 + * @throws TimeoutException if the specified timeout elapses
1.420 + * @throws BrokenBarrierException if <em>another</em> thread was
1.421 + * interrupted or timed out while the current thread was
1.422 + * waiting, or the barrier was reset, or the barrier was broken
1.423 + * when {@code await} was called, or the barrier action (if
1.424 + * present) failed due an exception
1.425 + */
1.426 + public int await(long timeout, TimeUnit unit)
1.427 + throws InterruptedException,
1.428 + BrokenBarrierException,
1.429 + TimeoutException {
1.430 + return dowait(true, unit.toNanos(timeout));
1.431 + }
1.432 +
1.433 + /**
1.434 + * Queries if this barrier is in a broken state.
1.435 + *
1.436 + * @return {@code true} if one or more parties broke out of this
1.437 + * barrier due to interruption or timeout since
1.438 + * construction or the last reset, or a barrier action
1.439 + * failed due to an exception; {@code false} otherwise.
1.440 + */
1.441 + public boolean isBroken() {
1.442 + final ReentrantLock lock = this.lock;
1.443 + lock.lock();
1.444 + try {
1.445 + return generation.broken;
1.446 + } finally {
1.447 + lock.unlock();
1.448 + }
1.449 + }
1.450 +
1.451 + /**
1.452 + * Resets the barrier to its initial state. If any parties are
1.453 + * currently waiting at the barrier, they will return with a
1.454 + * {@link BrokenBarrierException}. Note that resets <em>after</em>
1.455 + * a breakage has occurred for other reasons can be complicated to
1.456 + * carry out; threads need to re-synchronize in some other way,
1.457 + * and choose one to perform the reset. It may be preferable to
1.458 + * instead create a new barrier for subsequent use.
1.459 + */
1.460 + public void reset() {
1.461 + final ReentrantLock lock = this.lock;
1.462 + lock.lock();
1.463 + try {
1.464 + breakBarrier(); // break the current generation
1.465 + nextGeneration(); // start a new generation
1.466 + } finally {
1.467 + lock.unlock();
1.468 + }
1.469 + }
1.470 +
1.471 + /**
1.472 + * Returns the number of parties currently waiting at the barrier.
1.473 + * This method is primarily useful for debugging and assertions.
1.474 + *
1.475 + * @return the number of parties currently blocked in {@link #await}
1.476 + */
1.477 + public int getNumberWaiting() {
1.478 + final ReentrantLock lock = this.lock;
1.479 + lock.lock();
1.480 + try {
1.481 + return parties - count;
1.482 + } finally {
1.483 + lock.unlock();
1.484 + }
1.485 + }
1.486 +}