AtomicInteger
解析:http://donald-draper.iteye.com/blog/2359555
锁持有者管理器AbstractOwnableSynchronizer:http://donald-draper.iteye.com/blog/2360109
AQS
线程挂起辅助类LockSupport:http://donald-draper.iteye.com/blog/2360206
AQS详解-CLH
队列,线程等待状态:http://donald-draper.iteye.com/blog/2360256
AQS-Condition详解:http://donald-draper.iteye.com/blog/2360381
可重入锁ReentrantLock详解:http://donald-draper.iteye.com/blog/2360411
CountDownLatch使用场景:http://donald-draper.iteye.com/blog/2348106
CountDownLatch详解:http://donald-draper.iteye.com/blog/2360597
class="java">package java.util.concurrent;
import java.util.concurrent.locks.*;
/**
* A synchronization aid that allows a set of threads to all wait for
* each other to reach a common barrier point. CyclicBarriers are
* useful in programs involving a fixed sized party of threads that
* must occasionally wait for each other. The barrier is called
* [i]cyclic[/i] because it can be re-used after the waiting threads
* are released.
*
同步工具CyclicBarrier,一个集合线程,等待每一个线程达到共同的屏障点。
CyclicBarriers对一个复杂的线程集合必须互相等待完成任务,场景非常有用。
同步工具的屏障可以循环利用,因为在所有等待线程释放锁时,他可以被重新使用。
* <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released.
* This [i]barrier action[/i] is useful
* for updating shared-state before any of the parties continue.
*
CyclicBarrier的构造函数中,有一个带Runnable,在所有线程到达屏障点,并且共享锁没有完全释放,
这个功能,对于在其他线程继续执行任务前,更新共享状态非常有用。
* <p><b>Sample usage:</b> Here is an example of
* using a barrier in a parallel decomposition design:
* <pre>
简单的一个实例用,在并行的分解任务中,使用barrier
* class Solver {
* final int N;
* final float[][] data;
* final CyclicBarrier barrier;
*
* class Worker implements Runnable {
* int myRow;
* Worker(int row) { myRow = row; }
* public void run() {
* while (!done()) {
* processRow(myRow);
*
* try {
* barrier.await();
* } catch (InterruptedException ex) {
* return;
* } catch (BrokenBarrierException ex) {
* return;
* }
* }
* }
* }
*
* public Solver(float[][] matrix) {
* data = matrix;
* N = matrix.length;
* barrier = new CyclicBarrier(N,
* new Runnable() {
* public void run() {
* mergeRows(...);
* }
* });
* for (int i = 0; i < N; ++i)
* new Thread(new Worker(i)).start();
*
* waitUntilDone();
* }
* }
* </pre>
* Here, each worker thread processes a row of the matrix then waits at the
* barrier until all rows have been processed. When all rows are processed
* the supplied {@link Runnable} barrier action is executed and merges the
* rows. If the merger
* determines that a solution has been found then <tt>done()</tt> will return
* <tt>true</tt> and each worker will terminate.
上述实例,描述的每个线程处理矩阵的每一行数据,当线程处理完一行数据时,等待其他线程处理完各自
的一行数据。当所有的线程处理完各自行数据时,屏障点线程Runnable,执行合并矩阵的行数据。
当屏障点线程Runnable,决定执行合并是,每个线程的done函数返回true,结束每个线程工作。
* <p>If the barrier action does not rely on the parties being suspended when
* it is executed, then any of the threads in the party could execute that
* action when it is released. To facilitate this, each invocation of
* {@link #await} returns the arrival index of that thread at the barrier.
* You can then choose which thread should execute the barrier action, for
* example:
屏障点action动作线程的执行,不能依赖于组线程中将要暂定的线程,分组中的每一个线程,都可以
执行action,在共享锁被释放之前。为了优化action的执行,我们可以利用,在每个线程调用await方法时,
返回线程到达屏障点的index,来决定,那个线程执行屏障动作。
* <pre> if (barrier.await() == 0) {
//最后一个到达屏障点的线程,执行屏障action
* // log the completion of this iteration
* }</pre>
*
* <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
* for failed synchronization attempts: If a thread leaves a barrier
* point prematurely because of interruption, failure, or timeout, all
* other threads waiting at that barrier point will also leave
* abnormally via {@link BrokenBarrierException} (or
* {@link InterruptedException} if they too were interrupted at about
* the same time).
*
CyclicBarrier对于失败同步的尝试,用all-or-none breakage model:
如果一个线程,因为中断,失败,超时,永久的离开屏障点,那么其他在屏障点等待的线程,
通过BrokenBarrierException,abnormally离开。
* <p>Memory consistency effects: Actions in a thread prior to calling
* {@code await()}
* [url=package-summary.html#MemoryVisibility]<i>happen-before</i>[/url]
* actions that are part of the barrier action, which in turn
* <i>happen-before</i> actions following a successful return from the
* corresponding {@code await()} in other threads.
*
内存一致性:actions优先call await函数,这个基于内存可见机制-happen-before法则。
屏障点的分组线程,返回happen-before,协调分组线程工作的线程,await的成功返回。
* @since 1.5
* @see CountDownLatch
*
* @author Doug Lea
*/
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which <tt>count</tt> applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
每次屏障点,表示一代实例。当屏障点被打开或者重置时,generation将会改变。
由于锁以不确定的方式,分配给等待线程,线程可以多代屏障点的方式,使用barrier。
如果线程组存在break,并且没有reset,则不需要激活一代。
Generation可以这么理解,当有线程有多个分组,一个分组执行完,执行下一组;每一组
我们可以理解为Generation,当线程组出现break,且没有reset,则Generation不会被激活。
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
屏障点保护锁
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
条件等待,直到所有的线程打开锁,
private final Condition trip = lock.newCondition();
/** The number of parties */
共享锁数量
private final int parties;
/* The command to run when tripped */
障碍点执行的命令
private final Runnable barrierCommand;
/** The current generation */
当前代
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
表示分组中,还有多少个在等待。在每一代,count从parties to 0。
在每一次创建新生代中或broken时,count重置为parties
private int count;
}
先看构造:
/**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*常见一个屏障点,当所有parties线程在等待时,将会打开,同时最后一个进入
屏障点的线程,将会执行barrierAction。
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
线程代broken处理
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
当线程持有锁,设置当前线程代broken,唤醒当前代线程
private void breakBarrier() {
//
generation.broken = true;
//重置共享锁状态
count = parties;
//唤醒所有在屏障点,等待的线程
trip.signalAll();
}
创建下一代
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
线程持有锁,更新屏障点状态,唤醒所有等待,线程
private void nextGeneration() {
// signal completion of last generation
//唤醒上一代,完成的线程
trip.signalAll();
// set up next generation
//重置共享锁状态
count = parties;
//创建下一代
generation = new Generation();
}
}
我们来看屏障等待
/**
* Waits until all {@linkplain #getParties parties} have invoked
* <tt>await</tt> on this barrier.
* 等待所享有的线程到达屏障点
* <p>If the current thread is not the last to arrive then it is
* disabled for thread scheduling purposes and lies dormant until
* one of the following things happens:
当线程不是最后一个到达屏障点,线程将会不会被调度,直到以下情况发生
* [list]
* <li>The last thread arrives; or最后一个线程到达屏障点
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or其他线程中断当前线程
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* one of the other waiting threads; or其他等待线程,被中断
* <li>Some other thread times out while waiting for barrier; or
* <li>Some other thread invokes {@link #reset} on this barrier.
* [/list]
*一些线程等待屏障点超时,或其他以下线程调用reset
* <p>If the current thread:
* [list]
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* [/list]
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
当前线程带着中断状态,在等待屏障点,当中断异常抛出时,当前线程中断消除。
* <p>If the barrier is {@link #reset} while any thread is waiting,
* or if the barrier {@linkplain #isBroken is broken} when
* <tt>await</tt> is invoked, or while any thread is waiting, then
* {@link BrokenBarrierException} is thrown.
*当其他线程在等待,如果屏障点被重置,或broke,则抛出BrokenBarrierException
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
* then all other waiting threads will throw
* {@link BrokenBarrierException} and the barrier is placed in the broken
* state.
*在等待的过程中,如果其他线程中断,则抛出BrokenBarrierException,屏障点
设置为broken状态。
* <p>If the current thread is the last thread to arrive, and a
* non-null barrier action was supplied in the constructor, then the
* current thread runs the action before allowing the other threads to
* continue.
如果当前线程,是最后一个到达屏障点的,如果屏障点动作线程不为null,
则执行action,在下一代线程组执行任务前。
* If an exception occurs during the barrier action then that exception
* will be propagated in the current thread and the barrier is placed in
* the broken state.
*如果在执行action的过程中,出现异常,则当前线程将会抛出异常,屏障点处于破位状态
* @return the arrival index of the current thread, where index
* <tt>{@link #getParties()} - 1</tt> indicates the first
* to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws BrokenBarrierException if [i]another[/i] thread was
* interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was
* broken when {@code await} was called, or the barrier
* action (if present) failed due an exception.
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
//委托给dowait
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取线程代
final Generation g = generation;
//如果屏障点破位,则抛出BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
//如果线程中断,则设置屏障点破位,重置count为parties,
//唤醒所有在屏障点,等待的线程,抛出中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//共享锁数量,自减
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
//如果所有线程达到屏障点,则执行action
command.run();
ranAction = true;
//创建一下代
nextGeneration();
//返回0,屏障点解除
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//自旋,直到所有线程到达屏障点,当前代broken,中断,或超时
for (;;) {
try {
//非超时等待await,否则awaitNanos
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
//如果超时,解除屏障点
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
小节:
线程到达屏障点时,首先检查线程代,有没有broken,如果broken,
则抛出BrokenBarrierException,如果线程中断,则当前代broken,
重置共享锁状态,唤醒所有等待线程。如果上述条件不满足,则释放
count,判断是否当前代线程,是否都到达屏障点,如果是,判断action
是否为null,不为null,则执行action;当释放count,当前代线程,仍有在执行的,
自旋等待屏障点条件trip,如果是超时等待,则判断时间是否超时,超时则breakBarrier。
再看
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
与await基本相同,都是委托给dowait
/**
* Returns the number of parties required to trip this barrier.
*
* @return the number of parties required to trip this barrier
*/
public int getParties() {
return parties;
}
/**
* Queries if this barrier is in a broken state.
*
* @return {@code true} if one or more parties broke out of this
* barrier due to interruption or timeout since
* construction or the last reset, or a barrier action
* failed due to an exception; {@code false} otherwise.
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets [i]after[/i]
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/**
* Returns the number of parties currently waiting at the barrier.
* This method is primarily useful for debugging and assertions.
*返回在屏障点等待线程数
* @return the number of parties currently blocked in {@link #await}
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
总结:
屏障点思想,当每个线程完成任务时,自旋等待条件Condition trip,释放共享锁,count减1;当线程代的最后一个线程到达屏障点时,唤醒线程代中所有等待的线程,
如果有action,执行action,然后创建下一代线程。如果在线程代未结束之前,有等待线程中断或超时,则结束当前代,唤醒所有等待线程,重置count为parties。