前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。


内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

?public?class??{
?????//Synchronization?control?For?CountDownLatch.
?????//Uses?AQS?state?to?represent?count.
????private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{
????????private?static?final?long?serialVersionUID?=?4982264981922014374L;

????????Sync(int?count)?{
????????????setState(count);
????????}

????????int?getCount()?{
????????????return?getState();
????????}

????????protected?int?tryAcquireShared(int?acquires)?{
????????????return?(getState()?==?0)???1?:?-1;
????????}

????????protected?boolean?tryReleaseShared(int?releases)?{
????????????//?Decrement?count;?signal?when?transition?to?zero
????????????for?(;;)?{
????????????????int?c?=?getState();
????????????????if?(c?==?0)
????????????????????return?false;
????????????????int?nextc?=?c-1;
????????????????if?(compareAndSetState(c,?nextc))
????????????????????return?nextc?==?0;
????????????}
????????}
????}

????private?final?Sync?sync;
????...?...//
?}
?public?class?CyclicBarrier?{
????/**
?????*?Each?use?of?the?barrier?is?represented?as?a?generation?instance.
?????*?The?generation?changes?whenever?the?barrier?is?tripped,?or
?????*?is?reset.?There?can?be?many?generations?associated?with?threads
?????*?using?the?barrier?-?due?to?the?non-deterministic?way?the?lock
?????*?may?be?allocated?to?waiting?threads?-?but?only?one?of?these
?????*?can?be?active?at?a?time?(the?one?to?which?{@code?count}?applies)
?????*?and?all?the?rest?are?either?broken?or?tripped.
?????*?There?need?not?be?an?active?generation?if?there?has?been?a?break
?????*?but?no?subsequent?reset.
?????*/
????private?static?class?Generation?{
????????boolean?broken?=?false;
????}

????/**?The?lock?for?guarding?barrier?entry?*/
????private?final?ReentrantLock?lock?=?new?ReentrantLock();
????/**?Condition?to?wait?on?until?tripped?*/
????private?final?Condition?trip?=?lock.newCondition();
????/**?The?number?of?parties?*/
????private?final?int?parties;
????/*?The?command?to?run?when?tripped?*/
????private?final?Runnable?barrierCommand;
????/**?The?current?generation?*/
????private?Generation?generation?=?new?Generation();

????/**
?????*?Number?of?parties?still?waiting.?Counts?down?from?parties?to?0
?????*?on?each?generation.??It?is?reset?to?parties?on?each?new
?????*?generation?or?when?broken.
?????*/
????private?int?count;

????/**
?????*?Updates?state?on?barrier?trip?and?wakes?up?everyone.
?????*?Called?only?while?holding?lock.
?????*/
????private?void?nextGeneration()?{
????????//?signal?completion?of?last?generation
????????trip.signalAll();
????????//?set?up?next?generation
????????count?=?parties;
????????generation?=?new?Generation();
????}

????/**
?????*?Sets?current?barrier?generation?as?broken?and?wakes?up?everyone.
?????*?Called?only?while?holding?lock.
?????*/
????private?void?breakBarrier()?{
????????generation.broken?=?true;
????????count?=?parties;
????????trip.signalAll();
????}

????/**
?????*?Main?barrier?code,?covering?the?various?policies.
?????*/
????private?int?dowait(boolean?timed,?long?nanos)
????????throws?InterruptedException,?BrokenBarrierException,
???????????????TimeoutException?{
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lock();
????????try?{
????????????final?Generation?g?=?generation;

????????????if?(g.broken)
????????????????throw?new?BrokenBarrierException();

????????????if?(Thread.interrupted())?{
????????????????breakBarrier();
????????????????throw?new?InterruptedException();
????????????}

????????????int?index?=?--count;
????????????if?(index?==?0)?{??//?tripped
????????????????boolean?ranAction?=?false;
????????????????try?{
????????????????????final?Runnable?command?=?barrierCommand;
????????????????????if?(command?!=?null)
????????????????????????command.run();
????????????????????ranAction?=?true;
????????????????????nextGeneration();
????????????????????return?0;
????????????????}?finally?{
????????????????????if?(!ranAction)
????????????????????????breakBarrier();
????????????????}
????????????}

????????????//?loop?until?tripped,?broken,?interrupted,?or?timed?out
????????????for?(;;)?{
????????????????try?{
????????????????????if?(!timed)
????????????????????????trip.await();
????????????????????else?if?(nanos?>?0L)
????????????????????????nanos?=?trip.awaitNanos(nanos);
????????????????}?catch?(InterruptedException?ie)?{
????????????????????if?(g?==?generation?&&?!?g.broken)?{
????????????????????????breakBarrier();
????????????????????????throw?ie;
????????????????????}?else?{
????????????????????????//?We're?about?to?finish?waiting?even?if?we?had?not
????????????????????????//?been?interrupted,?so?this?interrupt?is?deemed?to
????????????????????????//?"belong"?to?subsequent?execution.
????????????????????????Thread.currentThread().interrupt();
????????????????????}
????????????????}

????????????????if?(g.broken)
????????????????????throw?new?BrokenBarrierException();

????????????????if?(g?!=?generation)
????????????????????return?index;

????????????????if?(timed?&&?nanos?<=?0L)?{
????????????????????breakBarrier();
????????????????????throw?new?TimeoutException();
????????????????}
????????????}
????????}?finally?{
????????????lock.unlock();
????????}
????}
????...?...?//
?}


实战 - 展示各自的使用场景


/**
?*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
?*/
public?class?UseCountDownLatch?{
???
????static?CountDownLatch?latch?=?new?CountDownLatch(6);

????/*初始化线程*/
????private?static?class?InitThread?implements?Runnable{

????????public?void?run()?{
???????????System.out.println("Thread_"+Thread.currentThread().getId()
?????????????????+"?ready?init?work......");
????????????latch.countDown();
????????????for(int?i?=0;i<2;i++)?{
???????????????System.out.println("Thread_"+Thread.currentThread().getId()
?????????????????????+"?........continue?do?its?work");
????????????}
????????}
????}

????/*业务线程等待latch的计数器为0完成*/
????private?static?class?BusiThread?implements?Runnable{

????????public?void?run()?{
????????????try?{
????????????????latch.await();
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????for(int?i?=0;i<3;i++)?{
???????????????System.out.println("BusiThread_"+Thread.currentThread().getId()
?????????????????????+"?do?business-----");
????????????}
????????}
????}

????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????new?Thread(new?Runnable()?{
????????????public?void?run()?{
???????????????SleepTools.ms(1);
????????????????System.out.println("Thread_"+Thread.currentThread().getId()
?????????????????????+"?ready?init?work?step?1st......");
????????????????latch.countDown();
????????????????System.out.println("begin?step?2nd.......");
????????????????SleepTools.ms(1);
????????????????System.out.println("Thread_"+Thread.currentThread().getId()
?????????????????????+"?ready?init?work?step?2nd......");
????????????????latch.countDown();
????????????}
????????}).start();
????????new?Thread(new?BusiThread()).start();
????????for(int?i=0;i<=3;i++){
????????????Thread?thread?=?new?Thread(new?InitThread());
????????????thread.start();
????????}
????????latch.await();
????????System.out.println("Main?do?ites?work........");
????}
}
/**
?*类说明:共4个子线程,他们全部完成工作后,交出自己结果,
?*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
?*/
class?UseCyclicBarrier?{
????private?static?CyclicBarrier?barrier
????????????=?new?CyclicBarrier(4,new?CollectThread());

????//存放子线程工作结果的容器
????private?static?ConcurrentHashMap<String,Long>?resultMap
????????????=?new?ConcurrentHashMap<String,Long>();

????public?static?void?main(String[]?args)?{
????????for(int?i=0;i<4;i++){
????????????Thread?thread?=?new?Thread(new?SubThread());
????????????thread.start();
????????}

????}

????/*汇总的任务*/
????private?static?class?CollectThread?implements?Runnable{

????????@Override
????????public?void?run()?{
????????????StringBuilder?result?=?new?StringBuilder();
????????????for(Map.Entry<String,Long>?workResult:resultMap.entrySet()){
???????????????result.append("["+workResult.getValue()+"]");
????????????}
????????????System.out.println("?the?result?=?"+?result);
????????????System.out.println("do?other?business........");
????????}
????}

????/*相互等待的子线程*/
????private?static?class?SubThread?implements?Runnable{
????????@Override
????????public?void?run()?{
???????????long?id?=?Thread.currentThread().getId();
????????????resultMap.put(Thread.currentThread().getId()+"",id);
????????????try?{
???????????????????Thread.sleep(1000+id);
???????????????????System.out.println("Thread_"+id+"?....do?something?");
????????????????barrier.await();
???????????????Thread.sleep(1000+id);
????????????????System.out.println("Thread_"+id+"?....do?its?business?");
????????????????barrier.await();
????????????}?catch?(Exception?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
}


?两者总结

1.?Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2.?New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3.?协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4.?从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5.?countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6.? ? ?就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;