CountDownLatchCyclicBarrierSem
一、CountDownLatch:1、什么是CountDownLatch:
CountDownLatch,闭锁,就是一个基于AQS共享模式的同步计数器,它内部的方法都是围绕AQS实现的。主要作用是使一个或一组线程在其他线程执行完毕之前,一直处于等待状态,直到其他线程执行完成后再继续执行。
CountDownLatch利用AQS的state变量充当计数器(由volatile修饰并使用CAS进行更新的),计数器的初始值就是线程的数量,每当一个线程执行完成,计数器的值就会减一,当计数器的值为0时,表示所有的线程都已经完成任务了,那么接下来就唤醒在CountDownLatch上等待的线程执行后面的任务。
那么当计数器的值为0时,主线程是如何被唤醒的呢?这就要从CountDownLatch的工作流程来说明了,CountDownLatch的工作流程可以看成在一开始只在CLH队列中放入一个主线程,然后不停的唤醒,唤醒之后如果发现state还是不为0,则继续等待。而主线程什么时候会被唤醒呢?当每个子线程执行完毕的时候,会调用countDown()并基于CAS将计数器state的值减一,减一成功释放资源后,就会调用unparkSuccessor()唤醒主线程,当所有的子线程都执行完了,也就是state为0时,这时候主线程被唤醒之后就可以继续执行了。
state被减成了0之后,就无法继续使用这个CountDownLatch了,需要重新new一个,因为state的数量只有在初始化CountDownLatch的时候才可以设置,这也是CountDownLatch不可重用的原因。2、CountDownLatch的源码简单说明:
从代码层面上来看,CountDownLatch基于内部类Sync实现,而Sync继承自AQS。CountDownLatch最主要有两个方法:await()和countDown()await():调用该方法的线程会被挂起,直到CountDownLatch计数器的值为0才继续执行,底层使用的是AQS的tryAcquireShared()countDown():用于减少计数器的数量,如果计数减为0的话,就会唤醒主线程,底层使用的是AQS的releaseShared()
countDown()方法详细流程:
参考文章:CountDownLatch原理与应用:https:juejin。cnpost6900368854472458248二、CyclicBarrier:1、什么是CyclicBarrier:
CyclicBarrier,循环栅栏,通过CyclicBarrier可以实现一组线程之间的相互等待,当所有线程都到达屏障点之后再执行后续的操作。通过await()方法可以实现等待,当最后一个线程执行完,会使得所有在相应CyclicBarrier实例上的等待的线程被唤醒,而最后一个线程自身不会被暂停。
CyclicBarrier没有像CountDownLatch和ReentrantLock使用AQS的state变量,它是直接借助ReentrantLock加上Condition等待唤醒的功能进而实现的。在构建CyclicBarrier的时候,传入的值会赋值给CyclicBarrier内部维护的变量count,同时也会赋值给parties变量(这是可以复用的关键)。
线程调用await()表示线程已经到达栅栏,每次调用await()时,会将count减一,操作count值是直接使用ReentrantLock来保证线程安全性的,如果count不为0,则添加到condition队列中,如果count等于0,则把节点从condition队列中移除并添加到AQS队列中进行全部唤醒,并且将parties的值重新赋值给count从而实现复用。2、CyclicBarrier的源码分析:
(1)成员变量:同步操作锁privatefinalReentrantLocklocknewReentrantLock();线程拦截器privatefinalConditiontriplock。newCondition();每次拦截的线程数privatefinalintparties;换代前执行的任务privatefinalRunnablebarrierCommand;表示栅栏的当前代privateGenerationgenerationnewGeneration();计数器privateintcount;静态内部类GenerationprivatestaticclassGeneration{booleanbrokenfalse;}
CyclicBarrier是通过独占锁实现的,底层包含了ReentrantLock对象lock和Condition对象trip,通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count:parties表示每次拦截的线程数,该值在构造时进行赋值,用于实现CyclicBarrier的复用;count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。
CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,利用它可以实现循环等待,当count减为0会将所有阻塞的线程唤醒,并设置成下一代。
barrierCommand表示换代前执行的任务,在唤醒所有线程前可以通过barrierCommand来执行指定的任务
(2)await()方法:
CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。非定时等待publicintawait()throwsInterruptedException,BrokenBarrierException{try{returndowait(false,0L);}catch(TimeoutExceptiontoe){thrownewError(toe);}}定时等待publicintawait(longtimeout,TimeUnitunit)throwsInterruptedException,BrokenBarrierException,TimeoutException{returndowait(true,unit。toNanos(timeout));}
BrokenBarrierException表示栅栏已经被破坏,破坏的原因可能是其中一个线程await()时被中断或者超时。
可以看到不管是定时等待还是非定时等待,它们都调用了dowait()方法,只不过是传入的参数不同而已,下面我们就来看看dowait()方法都做了些什么。核心等待方法privateintdowait(booleantimed,longnanos)throwsInterruptedException,BrokenBarrierException,TimeoutException{显示锁finalReentrantLocklockthis。lock;lock。lock();try{finalGenerationggeneration;检查当前栅栏是否被打翻if(g。broken){thrownewBrokenBarrierException();}检查当前线程是否被中断if(Thread。interrupted()){如果当前线程被中断会做以下三件事1。打翻当前栅栏2。唤醒拦截的所有线程3。抛出中断异常breakBarrier();thrownewInterruptedException();}每次都将计数器的值减1intindexcount;计数器的值减为0则需唤醒所有线程并转换到下一代if(index0){booleanranActionfalse;try{唤醒所有线程前先执行指定的任务finalRunnablecommandbarrierCommand;if(command!null){command。run();}ranActiontrue;唤醒所有线程并转到下一代nextGeneration();return0;}finally{确保在任务未成功执行时能将所有线程唤醒if(!ranAction){breakBarrier();}}}如果计数器不为0则执行此循环for(;;){try{根据传入的参数来决定是定时等待还是非定时等待if(!timed){trip。await();}elseif(nanos0L){nanostrip。awaitNanos(nanos);}}catch(InterruptedExceptionie){若当前线程在等待期间被中断则打翻栅栏唤醒其他线程if(ggeneration!g。broken){breakBarrier();throwie;}else{若在捕获中断异常前已经完成在栅栏上的等待,则直接调用中断操作Thread。currentThread()。interrupt();}}如果线程因为打翻栅栏操作而被唤醒则抛出异常if(g。broken){thrownewBrokenBarrierException();}如果线程因为换代操作而被唤醒则返回计数器的值if(g!generation){returnindex;}如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常if(timednanos0L){breakBarrier();thrownewTimeoutException();}}}finally{lock。unlock();}}
上面执行的代码相对比较容易看懂,我们再来看一下执行流程:
执行dowait()方法时,先获得显示锁,判断当前线程状态是否被中断,如果是,则执行breakBarrier()方法,唤醒之前阻塞的所有线程,并将计数器重置,否则,往下执行;计数器count减1,如果count0,表示最后一个线程达到栅栏,接着执行之前指定的Runnable接口,同时执行nextGeneration()方法进入下一代;否则,进入自旋,判断当前线程是进入定时等待还是非定时等待,如果在等待过程中被中断,执行breakBarrier()方法,唤醒之前阻塞的所有线程;判断是否是因为执行breakBarrier()方法而被唤醒,如果是,则抛出异常;判断是否是正常的换代操作而被唤醒,如果是,则返回计数器的值;判断是否是超时而被唤醒,如果是,则唤醒之前阻塞的所有线程,并抛出异常;释放锁。
(3)breakBarrier()方法:privatevoidbreakBarrier(){generation。brokentrue;栅栏被打破countparties;重置counttrip。signalAll();唤醒之前阻塞的线程}
(4)nextGeneration()方法:privatevoidnextGeneration(){唤醒所以的线程trip。signalAll();重置计数器countparties;重新开始generationnewGeneration();}
(5)reset()方法:重置barrier到初始状态,所有还在等待中的线程最终会抛出BrokenBarrierException。publicvoidreset(){finalReentrantLocklockthis。lock;lock。lock();try{breakBarrier();breakthecurrentgenerationnextGeneration();startanewgeneration}finally{lock。unlock();}}
参考文章:CyclicBarrier原理与使用:https:juejin。cnpost6900548129645395982三、Semaphore:1、什么是Semaphore:
Semaphore信号量,主要用于控制并发访问共享资源的线程数量,底层基于AQS共享模式,并依赖AQS的变量state作为许可证permit,通过控制许可证的数量,来保证线程之间的配合。线程使用acquire()获取访问许可,只有拿到许可证后才能继续运行,当Semaphore的permit不为0的时候,对请求资源的线程放行,同时permit的值减1,当permit的值为0时,那么请求资源的线程会被阻塞直到其他线程释放访问许可,当线程对共享资源操作完成后,使用release()归还访问许可。不同于CyclicBarrier和ReentrantLock,Semaphore不会使用到AQS的Condition条件队列,都是在CLH同步队列中操作,只是当前线程会被park。另外Semaphore是不可重入的。2、Semaphore的公平和非公平两种模式:
Semaphore通过自定义两种不同的同步器(FairSync和NonfairSync)提供了公平和非公平两种工作模式,两种模式下分别提供了限时不限时、响应中断不响应中断的获取资源的方法(限时获取总是及时响应中断的),而所有的释放资源的release()操作是统一的。公平模式:遵循FIFO,调用acquire()方法获取许可证的顺序时,先判断同步队列中是不是存在其他的等待线程,如果存在就将请求线程封装成Node结点加入同步队列,从而保证每个线程获取同步状态都是按照先到先得的顺序执行的,否则对state值进行减操作并返回剩下的信号量非公平模式:是抢占式的,通过竞争的方式获取,不管同步队列中是否存在等待线程,有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。
框架流程图如下:
3、尝试获取资源acquire()方法的执行流程图:
参考文章:Semaphore源码解析:https:juejin。cnpost6899744477427007495
作者:张维鹏
来源:blog。csdn。neta745233700articledetails120688546
点击关注,带你了解更多