rt/emul/compact/src/main/java/java/util/concurrent/CyclicBarrier.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 import java.util.concurrent.locks.*;
    38 
    39 /**
    40  * A synchronization aid that allows a set of threads to all wait for
    41  * each other to reach a common barrier point.  CyclicBarriers are
    42  * useful in programs involving a fixed sized party of threads that
    43  * must occasionally wait for each other. The barrier is called
    44  * <em>cyclic</em> because it can be re-used after the waiting threads
    45  * are released.
    46  *
    47  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
    48  * that is run once per barrier point, after the last thread in the party
    49  * arrives, but before any threads are released.
    50  * This <em>barrier action</em> is useful
    51  * for updating shared-state before any of the parties continue.
    52  *
    53  * <p><b>Sample usage:</b> Here is an example of
    54  *  using a barrier in a parallel decomposition design:
    55  * <pre>
    56  * class Solver {
    57  *   final int N;
    58  *   final float[][] data;
    59  *   final CyclicBarrier barrier;
    60  *
    61  *   class Worker implements Runnable {
    62  *     int myRow;
    63  *     Worker(int row) { myRow = row; }
    64  *     public void run() {
    65  *       while (!done()) {
    66  *         processRow(myRow);
    67  *
    68  *         try {
    69  *           barrier.await();
    70  *         } catch (InterruptedException ex) {
    71  *           return;
    72  *         } catch (BrokenBarrierException ex) {
    73  *           return;
    74  *         }
    75  *       }
    76  *     }
    77  *   }
    78  *
    79  *   public Solver(float[][] matrix) {
    80  *     data = matrix;
    81  *     N = matrix.length;
    82  *     barrier = new CyclicBarrier(N,
    83  *                                 new Runnable() {
    84  *                                   public void run() {
    85  *                                     mergeRows(...);
    86  *                                   }
    87  *                                 });
    88  *     for (int i = 0; i < N; ++i)
    89  *       new Thread(new Worker(i)).start();
    90  *
    91  *     waitUntilDone();
    92  *   }
    93  * }
    94  * </pre>
    95  * Here, each worker thread processes a row of the matrix then waits at the
    96  * barrier until all rows have been processed. When all rows are processed
    97  * the supplied {@link Runnable} barrier action is executed and merges the
    98  * rows. If the merger
    99  * determines that a solution has been found then <tt>done()</tt> will return
   100  * <tt>true</tt> and each worker will terminate.
   101  *
   102  * <p>If the barrier action does not rely on the parties being suspended when
   103  * it is executed, then any of the threads in the party could execute that
   104  * action when it is released. To facilitate this, each invocation of
   105  * {@link #await} returns the arrival index of that thread at the barrier.
   106  * You can then choose which thread should execute the barrier action, for
   107  * example:
   108  * <pre>  if (barrier.await() == 0) {
   109  *     // log the completion of this iteration
   110  *   }</pre>
   111  *
   112  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
   113  * for failed synchronization attempts: If a thread leaves a barrier
   114  * point prematurely because of interruption, failure, or timeout, all
   115  * other threads waiting at that barrier point will also leave
   116  * abnormally via {@link BrokenBarrierException} (or
   117  * {@link InterruptedException} if they too were interrupted at about
   118  * the same time).
   119  *
   120  * <p>Memory consistency effects: Actions in a thread prior to calling
   121  * {@code await()}
   122  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
   123  * actions that are part of the barrier action, which in turn
   124  * <i>happen-before</i> actions following a successful return from the
   125  * corresponding {@code await()} in other threads.
   126  *
   127  * @since 1.5
   128  * @see CountDownLatch
   129  *
   130  * @author Doug Lea
   131  */
   132 public class CyclicBarrier {
   133     /**
   134      * Each use of the barrier is represented as a generation instance.
   135      * The generation changes whenever the barrier is tripped, or
   136      * is reset. There can be many generations associated with threads
   137      * using the barrier - due to the non-deterministic way the lock
   138      * may be allocated to waiting threads - but only one of these
   139      * can be active at a time (the one to which <tt>count</tt> applies)
   140      * and all the rest are either broken or tripped.
   141      * There need not be an active generation if there has been a break
   142      * but no subsequent reset.
   143      */
   144     private static class Generation {
   145         boolean broken = false;
   146     }
   147 
   148     /** The lock for guarding barrier entry */
   149     private final ReentrantLock lock = new ReentrantLock();
   150     /** Condition to wait on until tripped */
   151     private final Condition trip = lock.newCondition();
   152     /** The number of parties */
   153     private final int parties;
   154     /* The command to run when tripped */
   155     private final Runnable barrierCommand;
   156     /** The current generation */
   157     private Generation generation = new Generation();
   158 
   159     /**
   160      * Number of parties still waiting. Counts down from parties to 0
   161      * on each generation.  It is reset to parties on each new
   162      * generation or when broken.
   163      */
   164     private int count;
   165 
   166     /**
   167      * Updates state on barrier trip and wakes up everyone.
   168      * Called only while holding lock.
   169      */
   170     private void nextGeneration() {
   171         // signal completion of last generation
   172         trip.signalAll();
   173         // set up next generation
   174         count = parties;
   175         generation = new Generation();
   176     }
   177 
   178     /**
   179      * Sets current barrier generation as broken and wakes up everyone.
   180      * Called only while holding lock.
   181      */
   182     private void breakBarrier() {
   183         generation.broken = true;
   184         count = parties;
   185         trip.signalAll();
   186     }
   187 
   188     /**
   189      * Main barrier code, covering the various policies.
   190      */
   191     private int dowait(boolean timed, long nanos)
   192         throws InterruptedException, BrokenBarrierException,
   193                TimeoutException {
   194         final ReentrantLock lock = this.lock;
   195         lock.lock();
   196         try {
   197             final Generation g = generation;
   198 
   199             if (g.broken)
   200                 throw new BrokenBarrierException();
   201 
   202             if (Thread.interrupted()) {
   203                 breakBarrier();
   204                 throw new InterruptedException();
   205             }
   206 
   207            int index = --count;
   208            if (index == 0) {  // tripped
   209                boolean ranAction = false;
   210                try {
   211                    final Runnable command = barrierCommand;
   212                    if (command != null)
   213                        command.run();
   214                    ranAction = true;
   215                    nextGeneration();
   216                    return 0;
   217                } finally {
   218                    if (!ranAction)
   219                        breakBarrier();
   220                }
   221            }
   222 
   223             // loop until tripped, broken, interrupted, or timed out
   224             for (;;) {
   225                 try {
   226                     if (!timed)
   227                         trip.await();
   228                     else if (nanos > 0L)
   229                         nanos = trip.awaitNanos(nanos);
   230                 } catch (InterruptedException ie) {
   231                     if (g == generation && ! g.broken) {
   232                         breakBarrier();
   233                         throw ie;
   234                     } else {
   235                         // We're about to finish waiting even if we had not
   236                         // been interrupted, so this interrupt is deemed to
   237                         // "belong" to subsequent execution.
   238                         Thread.currentThread().interrupt();
   239                     }
   240                 }
   241 
   242                 if (g.broken)
   243                     throw new BrokenBarrierException();
   244 
   245                 if (g != generation)
   246                     return index;
   247 
   248                 if (timed && nanos <= 0L) {
   249                     breakBarrier();
   250                     throw new TimeoutException();
   251                 }
   252             }
   253         } finally {
   254             lock.unlock();
   255         }
   256     }
   257 
   258     /**
   259      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
   260      * given number of parties (threads) are waiting upon it, and which
   261      * will execute the given barrier action when the barrier is tripped,
   262      * performed by the last thread entering the barrier.
   263      *
   264      * @param parties the number of threads that must invoke {@link #await}
   265      *        before the barrier is tripped
   266      * @param barrierAction the command to execute when the barrier is
   267      *        tripped, or {@code null} if there is no action
   268      * @throws IllegalArgumentException if {@code parties} is less than 1
   269      */
   270     public CyclicBarrier(int parties, Runnable barrierAction) {
   271         if (parties <= 0) throw new IllegalArgumentException();
   272         this.parties = parties;
   273         this.count = parties;
   274         this.barrierCommand = barrierAction;
   275     }
   276 
   277     /**
   278      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
   279      * given number of parties (threads) are waiting upon it, and
   280      * does not perform a predefined action when the barrier is tripped.
   281      *
   282      * @param parties the number of threads that must invoke {@link #await}
   283      *        before the barrier is tripped
   284      * @throws IllegalArgumentException if {@code parties} is less than 1
   285      */
   286     public CyclicBarrier(int parties) {
   287         this(parties, null);
   288     }
   289 
   290     /**
   291      * Returns the number of parties required to trip this barrier.
   292      *
   293      * @return the number of parties required to trip this barrier
   294      */
   295     public int getParties() {
   296         return parties;
   297     }
   298 
   299     /**
   300      * Waits until all {@linkplain #getParties parties} have invoked
   301      * <tt>await</tt> on this barrier.
   302      *
   303      * <p>If the current thread is not the last to arrive then it is
   304      * disabled for thread scheduling purposes and lies dormant until
   305      * one of the following things happens:
   306      * <ul>
   307      * <li>The last thread arrives; or
   308      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
   309      * the current thread; or
   310      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
   311      * one of the other waiting threads; or
   312      * <li>Some other thread times out while waiting for barrier; or
   313      * <li>Some other thread invokes {@link #reset} on this barrier.
   314      * </ul>
   315      *
   316      * <p>If the current thread:
   317      * <ul>
   318      * <li>has its interrupted status set on entry to this method; or
   319      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
   320      * </ul>
   321      * then {@link InterruptedException} is thrown and the current thread's
   322      * interrupted status is cleared.
   323      *
   324      * <p>If the barrier is {@link #reset} while any thread is waiting,
   325      * or if the barrier {@linkplain #isBroken is broken} when
   326      * <tt>await</tt> is invoked, or while any thread is waiting, then
   327      * {@link BrokenBarrierException} is thrown.
   328      *
   329      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
   330      * then all other waiting threads will throw
   331      * {@link BrokenBarrierException} and the barrier is placed in the broken
   332      * state.
   333      *
   334      * <p>If the current thread is the last thread to arrive, and a
   335      * non-null barrier action was supplied in the constructor, then the
   336      * current thread runs the action before allowing the other threads to
   337      * continue.
   338      * If an exception occurs during the barrier action then that exception
   339      * will be propagated in the current thread and the barrier is placed in
   340      * the broken state.
   341      *
   342      * @return the arrival index of the current thread, where index
   343      *         <tt>{@link #getParties()} - 1</tt> indicates the first
   344      *         to arrive and zero indicates the last to arrive
   345      * @throws InterruptedException if the current thread was interrupted
   346      *         while waiting
   347      * @throws BrokenBarrierException if <em>another</em> thread was
   348      *         interrupted or timed out while the current thread was
   349      *         waiting, or the barrier was reset, or the barrier was
   350      *         broken when {@code await} was called, or the barrier
   351      *         action (if present) failed due an exception.
   352      */
   353     public int await() throws InterruptedException, BrokenBarrierException {
   354         try {
   355             return dowait(false, 0L);
   356         } catch (TimeoutException toe) {
   357             throw new Error(toe); // cannot happen;
   358         }
   359     }
   360 
   361     /**
   362      * Waits until all {@linkplain #getParties parties} have invoked
   363      * <tt>await</tt> on this barrier, or the specified waiting time elapses.
   364      *
   365      * <p>If the current thread is not the last to arrive then it is
   366      * disabled for thread scheduling purposes and lies dormant until
   367      * one of the following things happens:
   368      * <ul>
   369      * <li>The last thread arrives; or
   370      * <li>The specified timeout elapses; or
   371      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
   372      * the current thread; or
   373      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
   374      * one of the other waiting threads; or
   375      * <li>Some other thread times out while waiting for barrier; or
   376      * <li>Some other thread invokes {@link #reset} on this barrier.
   377      * </ul>
   378      *
   379      * <p>If the current thread:
   380      * <ul>
   381      * <li>has its interrupted status set on entry to this method; or
   382      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
   383      * </ul>
   384      * then {@link InterruptedException} is thrown and the current thread's
   385      * interrupted status is cleared.
   386      *
   387      * <p>If the specified waiting time elapses then {@link TimeoutException}
   388      * is thrown. If the time is less than or equal to zero, the
   389      * method will not wait at all.
   390      *
   391      * <p>If the barrier is {@link #reset} while any thread is waiting,
   392      * or if the barrier {@linkplain #isBroken is broken} when
   393      * <tt>await</tt> is invoked, or while any thread is waiting, then
   394      * {@link BrokenBarrierException} is thrown.
   395      *
   396      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
   397      * waiting, then all other waiting threads will throw {@link
   398      * BrokenBarrierException} and the barrier is placed in the broken
   399      * state.
   400      *
   401      * <p>If the current thread is the last thread to arrive, and a
   402      * non-null barrier action was supplied in the constructor, then the
   403      * current thread runs the action before allowing the other threads to
   404      * continue.
   405      * If an exception occurs during the barrier action then that exception
   406      * will be propagated in the current thread and the barrier is placed in
   407      * the broken state.
   408      *
   409      * @param timeout the time to wait for the barrier
   410      * @param unit the time unit of the timeout parameter
   411      * @return the arrival index of the current thread, where index
   412      *         <tt>{@link #getParties()} - 1</tt> indicates the first
   413      *         to arrive and zero indicates the last to arrive
   414      * @throws InterruptedException if the current thread was interrupted
   415      *         while waiting
   416      * @throws TimeoutException if the specified timeout elapses
   417      * @throws BrokenBarrierException if <em>another</em> thread was
   418      *         interrupted or timed out while the current thread was
   419      *         waiting, or the barrier was reset, or the barrier was broken
   420      *         when {@code await} was called, or the barrier action (if
   421      *         present) failed due an exception
   422      */
   423     public int await(long timeout, TimeUnit unit)
   424         throws InterruptedException,
   425                BrokenBarrierException,
   426                TimeoutException {
   427         return dowait(true, unit.toNanos(timeout));
   428     }
   429 
   430     /**
   431      * Queries if this barrier is in a broken state.
   432      *
   433      * @return {@code true} if one or more parties broke out of this
   434      *         barrier due to interruption or timeout since
   435      *         construction or the last reset, or a barrier action
   436      *         failed due to an exception; {@code false} otherwise.
   437      */
   438     public boolean isBroken() {
   439         final ReentrantLock lock = this.lock;
   440         lock.lock();
   441         try {
   442             return generation.broken;
   443         } finally {
   444             lock.unlock();
   445         }
   446     }
   447 
   448     /**
   449      * Resets the barrier to its initial state.  If any parties are
   450      * currently waiting at the barrier, they will return with a
   451      * {@link BrokenBarrierException}. Note that resets <em>after</em>
   452      * a breakage has occurred for other reasons can be complicated to
   453      * carry out; threads need to re-synchronize in some other way,
   454      * and choose one to perform the reset.  It may be preferable to
   455      * instead create a new barrier for subsequent use.
   456      */
   457     public void reset() {
   458         final ReentrantLock lock = this.lock;
   459         lock.lock();
   460         try {
   461             breakBarrier();   // break the current generation
   462             nextGeneration(); // start a new generation
   463         } finally {
   464             lock.unlock();
   465         }
   466     }
   467 
   468     /**
   469      * Returns the number of parties currently waiting at the barrier.
   470      * This method is primarily useful for debugging and assertions.
   471      *
   472      * @return the number of parties currently blocked in {@link #await}
   473      */
   474     public int getNumberWaiting() {
   475         final ReentrantLock lock = this.lock;
   476         lock.lock();
   477         try {
   478             return parties - count;
   479         } finally {
   480             lock.unlock();
   481         }
   482     }
   483 }