读CyclicBarrier源码_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 读CyclicBarrier源码

读CyclicBarrier源码

 2017/9/1 19:08:47  红领巾丶  程序员俱乐部  我要评论(0)
  • 摘要://一个循环的屏障。所有的线程在屏障处等待其他线程执行完毕。然后再各自执行。//先看构造函数publicCyclicBarrier(intparties){this(parties,null);}//barrierAction代表在屏障上等待的最后一个线程已经执行完后,执行的runnablepublicCyclicBarrier(intparties,RunnablebarrierAction){if(parties<=0)thrownewIllegalArgumentException
  • 标签:源码
class="java">
//一个循环的屏障。所有的线程在屏障处等待其他线程执行完毕。然后再各自执行。
//先看构造函数
 public CyclicBarrier(int parties) {
        this(parties, null);
    }


//barrierAction代表在屏障上等待的最后一个线程已经执行完后,执行的runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }


//等待其他线程执行到这
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }


//如果count-1不为0则线程在trip条件上等待。否则唤醒在trip条件上等待的线程。
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();
            //如果线程已经被中断了
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;
	   //如果已经全部执行完了
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
	           //执行runnable
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
		        //在trip条件上等待
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

 //中断此barrier将所有在trip条件上等待的线程加入唤醒的队列
 private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }


 //重新生成下一代CyclicBarrier
 private void nextGeneration() {
        //唤醒在trip上等待的所有线程        
        trip.signalAll();
        // set up next generation
	//设置下一代操作(重用CyclicBarrier)
        count = parties;
        generation = new Generation();
    }

 //在此时间段等待
 public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

 //是否已经被损坏
 public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

//返回要求启动此CyclicBarrier的参与者数量。
public int getParties() {
        return parties;
    }

//还有几个参与者在等待
public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

//损坏旧屏障。创建新一代
 public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
发表评论
用户名: 匿名