一:前言 1。1:什么是线程 在讲些线程池前,我们需要说一下什么是线程,因为线程是进程中得一个实体,线程本身是不会独立存在的。进程是代码在数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,线程则是进程的一个执行路径,一个进程中至少有一线程,进程中的多个线程共享进程的资源。 1。2:线程池的作用【降低线程的创建销毁开销】【提供一种资源限制和管理的手段】二:ThreadPoolExecutor构造函数publicThreadPoolExecutor(intcorePoolSize,核心线程数intmaximumPoolSize,最大线程限制longkeepAliveTime,空闲存活时间TimeUnitunit,时间单位BlockingQueueRunnableworkQueue,任务队列ThreadFactorythreadFactory,线程工厂拒绝策略RejectedExecutionHandlerhandler){判断参数是否越界if(corePoolSize0maximumPoolSize0maximumPoolSizecorePoolSizekeepAliveTime0)thrownewIllegalArgumentException();if(workQueuenullthreadFactorynullhandlernull)thrownewNullPointerException();this。accSystem。getSecurityManager()null?null:AccessController。getContext();this。corePoolSizecorePoolSize;this。maximumPoolSizemaximumPoolSize;this。workQueueworkQueue;this。keepAliveTimeunit。toNanos(keepAliveTime);this。threadFactorythreadFactory;this。handlerhandler;} 线程池构造参数入参含义:corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;keepAliveTime:线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;unit:时间单位workQueue:用于保存等执行的任务的阻塞队列,比如基于数组的有界队列ArrayBlockingQueue、基于链表的无界LinkedBlockingQueue、和最多只有一个元素的同步队列SynchronousQueue等threadFactory:创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。handler:线程池的拒绝策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略: 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如记录日志或持久化存储不能处理的任务三:线程池工作原理 通过上述构造函数实例化ThreadPoolExecutor后,当调用execute,首先会判断当线程池中线程数量是否小于核心线程数,如小于则创建线程。如果此时核心线程数已经等于corePoolSize,则将提交的任务存放到workQueue队列中。当队列已经存满,会进行线程创建,创建失败则会执行具体的拒绝策略。(线程池在运行中的前提下) 四:线程池运行状态 线程池运行状态转换,可以使用ThreadPoolExecutorshutdown()和shutdownNow()两个方法进行改变。两者的区别在于shutdown执行会将线程池状态设置为SHOTDOWN,然后当线程池线程数量和工作队列为空才会进入TIDYING状态。而shutdownNow执行后线程池状态会设置为STOP,然后当线程池线程数量为空时才会进入到TIDYING状态。如图: 五:源码分析 5。1:关键属性、方法含义说明高3位:表示线程池的运行状态除去高3位之后的低位:表示当前线程池中所拥有的线程数量privatefinalAtomicIntegerctlnewAtomicInteger(ctlOf(RUNNING,0));表示在ctl,低COUNTBITS位是用于存放当前线程数量的位privatestaticfinalintCOUNTBITSInteger。SIZE3;低COUNTBITS位所能表达的最大数值0001111。。111privatestaticfinalintCAPACITY(1COUNTBITS)1;runStateisstoredinthehighorderbits运行中1(111。。。111)左移29位,1110000。。00仍为负数privatestaticfinalintRUNNING1COUNTBITS;0左移29位privatestaticfinalintSHUTDOWN0COUNTBITS;001000。。00privatestaticfinalintSTOP1COUNTBITS;010000。。00privatestaticfinalintTIDYING2COUNTBITS;011000。。00privatestaticfinalintTERMINATED3COUNTBITS;获取当前线程池运行状态privatestaticintrunStateOf(intc){returncCAPACITY;}获取当前线程池线程数量privatestaticintworkerCountOf(intc){returncCAPACITY;}用在重置线程池ctl值时会用到rs表示线程池在状态wc表示worker数量(可以理解我线程池数量)privatestaticintctlOf(intrs,intwc){returnrswc;}比较当前线程池ctl所表示的状态是否小于某个状态privatestaticbooleanrunStateLessThan(intc,ints){returncs;}线程池全局锁增加worker减少worker时需要持有mainLock修改线程池运行状态时也需要privatefinalReentrantLockmainLocknewReentrantLock();包含池中所有工作线程的集合。仅在持有mainLock时访问。privatefinalHashSetWorkerworkersnewHashSetWorker();控制核心线程数量内的线程,是否可以被回收true可以false不可以privatevolatilebooleanallowCoreThreadTimeOut;任务队列,当线程池中线程达到核心线程数时再提交任务时就会直接提交到workQueueprivatefinalBlockingQueueRunnableworkQueue;条件队列(本人简单理解为线程进入到阻塞队列前的临时队列,主要作用就是为了辅助阻塞队列)privatefinalConditionterminationmainLock。newCondition(); 5。2:execute方法分析 execute方法的作用是提交任务command到线程池进行执行。publicvoidexecute(Runnablecommand){判断是否为空if(commandnull)thrownewNullPointerException();intcctl。get();workerCountOf(c)获取任当前线程数量条件成立代表线程数量小于核心线程数量此次提交新建一个线程,对应线程池多了一个线程if(workerCountOf(c)corePoolSize){if(addWorker(command,true))return;执行到这条语句说明addWorker一定是失败了失败原因:1。存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c)corePoolSize成立后其他线程也可能成立了,并向线程池中创建了worker。这个时候线程池中的核心线程已经达到2。当前线程池状态发生改变。RUNNINGSHUTDOWNSTOPTIDYINGTERMINATION当线程池非RUNNING状态时,addWorker(firstTask!null,truefalse)一定会失败SHUTDOWN状态下也有可能创建成功。前提firstTasknull而且当前queue不为空,特殊情况cctl。get();}执行到此处的情况1。当前线程池线程数已经达到corePoolSize2。addWorker失败条件成立:说明线程池处于running状态,workQueue。offer(command)则尝试将task放入到workQueue中if(isRunning(c)workQueue。offer(command)){再次获取ctl保存到recheckintrecheckctl。get();条件1:isRunning(recheck)成立说明你提交到队列之后,线程状态被外部线程给修改了,比如shutdown()shutdownNow()这种情况需要把刚刚提交的任务给删除掉条件2:remove(command)有可能失败成功:提交之后,线程池中的线程还未消费处理失败:提交之后,在shutdown()shutdownNow之前就被线程池中的线程给处理了if(!isRunning(recheck)remove(command))提交之后线程池状态非running且任务出队成功则走拒绝策略reject(command);elseif(workerCountOf(recheck)0)addWorker(null,false);}执行到这里说明核心线程创建和队列追加失败执行拒绝策略elseif(!addWorker(command,false))reject(command);} 具体执行链路图: 5。3:内部类Worker分析 在了解addWorker方法前,我们先来了解一下线程池内部类WorkerprivatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{state:0表示未被占用0时表示被占用,0时表示初始化状态这种情况下不能抢锁ExclusiveOwnerThread:表示独占锁的线程(后续分析AQS是展开)Worker内部封装的线程linux进程win线程finalThreadthread;假设firstTask不为空,那么当worker启动后内部的线程启动,会优先执行firstTask,当执行完firstTask回到queue中获取下一个人任务RunnablefirstTask;完成任务数量volatilelongcompletedTasks;firstTask可以为null,为null启动后回到queue中获取Worker(RunnablefirstTask){设置AQS独占模式为初始化状态,这个时候不能被抢占锁setState(1);inhibitinterruptsuntilrunWorkerthis。firstTaskfirstTask;使用线程工厂创建一个线程,并且将当前的worker指定为Runnable,也就是说当Thread启动的时候,会将worker。run()为入口this。threadgetThreadFactory()。newThread(this);}当worker启动时会执行run方法publicvoidrun(){ThreadPollExecutorrunWorker()这个是一核心方法需要重点看runWorker(this);} 5。4:addworker方法分析 上面execute方法三处调用了addWorker,通过上述方法和内部类Worker可知addWorker主要负责创建新的线程并执行任务。firstTask任务coretrue代表使用核心线程数进行线程的判断false代表使用最大线程privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:自旋判断线程池状态是否允许创建线程的事情for(;;){获取ctlintcctl。get();获取当前线程池状态intrsrunStateOf(c);if(rsSHUTDOWN!(rsSHUTDOWNfirstTasknull!workQueue。isEmpty()))当线程池状态大于等于SHUTDOWN但是队列中已经没有任务了或者rsSHUTDOWN且firstTask!nullreturnfalse;上面代码就是判断线程池状态是否允许添加线程内部自旋for(;;){获取当前线程池线程数量intwcworkerCountOf(c);条件1:wcCAPACITY永远不会成立条件2:wc(core?corePoolSize:maximumPoolSize判断线程数量限制if(wcCAPACITYwc(core?corePoolSize:maximumPoolSize))达到线程限制returnfalse;通过cas操作线程数1相当于申请了一块创建线程的令牌if(compareAndIncrementWorkerCount(c))直接跳出外部自旋来到了A处breakretry;CAS失败没有成功的申请到令牌cctl。get();if(runStateOf(c)!rs)调到外层循环continueretry;}}来到了A处说明获取到了创建线程指令表示创建的workerStarted是否已经启动,false未启动true启动booleanworkerStartedfalse;表示创建的worker是否添加到池子中默认false未添加true添加booleanworkerAddedfalse;Workerwnull;try{创建worker执行完后线程已经创建好了wnewWorker(firstTask);将新创建的worker节点的线程赋值给tfinalThreadtw。thread;t!null判断为例防止ThreadFactory实现有bug,因为ThreadFactory是一个接口谁都可以实现if(t!null){获取全局锁的引入finalReentrantLockmainLockthis。mainLock;持有全局锁可能会阻塞,直到获取成功为止,统一时刻操作线程池内部相关的操作都必须持有锁mainLock。lock();try{再次获取线程池运行状态intrsrunStateOf(ctl。get());条件1:rsSHUTDOWN代表当前线程为正常状态条件2:前置当前线程池不是running状态rsSHUTDOWNfirstTasknull线程池状态为SHUTDOWNfirstTask为null其实判断的就是SHUTDOWN的特殊情况if(rsSHUTDOWN(rsSHUTDOWNfirstTasknull)){t。isAlive()当线程start后线程isAlive会返回true防止ThreadFactory中直接startif(t。isAlive())thrownewIllegalThreadStateException();将创建的worker添加到线程池中workers。add(w);获取线程数量intsworkers。size();条件成立:说明当前线程数量是一个新高,更新largestPoolSizeif(slargestPoolSize)largestPoolSizes;表示线程已经追加到线程池中workerAddedtrue;}}finally{释放线程池全局锁mainLock。unlock();}添加成功启动线程条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败if(workerAdded){t。start();workerStartedtrue;}}}finally{条件成立说明添加失败做一下清理操作if(!workerStarted)失败工作1。释放令牌2。将worker从集合中清理addWorkerFailed(w);}returnworkerStarted;} 使用ReentrantLock锁的一部分原因,是因为workers非线程安全 5。5:线程池内部类WorkerrunWorker分析 addWorker方法在添加woker到workers集合中后,会执行t。start();因为Worker重写了run方法,所以后许会执行到WorkerrunWorker。W就是启动的workerfinalvoidrunWorker(Workerw){wtt。threadThreadwtThread。currentThread();将初始化执行的task赋值给taskRunnabletaskw。firstTask;情况当前w。firstTask引用w。firstTasknull;调用原因:就是为了初始化workerstate0和exclusiveOwnerThreadnullw。unlock();allowinterrupts是否是突然退出true发生异常,当前线程是突然退出的,需要再特殊处理false正常退出booleancompletedAbruptlytrue;try{getTask这个是一个会阻塞线程的方法,getTask返回null说明当前线程需要执行结束逻辑while(task!null(taskgetTask())!null){加锁原因:shutDown时会判断当前worker状态,根据独占锁是否空闲来判断线程是否空闲w。lock();如果池正在停止,请确保线程被中断;如果没有,请确保线程没有中断。这需要在第二种情况下进行重新检查,以在清除中断的同时处理Shutdownowrace条件1:runStateAtLeast(ctl。get(),STOP)成立:说明当前线程处于STOPTIDYINGTERMINATION此时线程一定会给他一个中断信号情况1:runStateAtLeast(ctl。get(),STOP)!wt。isInterrupted()说明:当前线程池状态是》STOP且当前线程是中断状态,此时需要进入if里面,给当前线程一个中断情况2:runStateAtLeast(ctl。get(),STOP)false(Thread。interrupted()runStateAtLeast(ctl。get(),STOP)))!wt。isInterrupted())Thread。interrupted()获取中断状态,且设置中断为false,连续调用两次interrupted第二次一定返回falserunStateAtLeast(ctl。get(),STOP))大概率任然为false其实它在强制刷新线程中的中断标记为false,因为有可能上一次执行task时,业务代码里面将当前线程的中断标记位设置为了true,且没有处理,这里进行强制刷新一下不会影响到后面的task假设:Thread。interrupted()true且runStateAtLeast(ctl。get(),STOP))true仍然有可能发生,因为外部线程在第一次runStateAtLeast(ctl。get(),STOP))false后,有机会调用shutDown。shutDownNow方法,将线程池状态修改这个时候也会将当前线程中断标记位再次设置为中断状态if((runStateAtLeast(ctl。get(),STOP)(Thread。interrupted()runStateAtLeast(ctl。get(),STOP)))!wt。isInterrupted())wt。interrupt();try{钩子方法留给子类实现beforeExecute(wt,task);Throwablethrownnull;try{task。run();}catch(RuntimeExceptionx){thrownx;throwx;}catch(Errorx){thrownx;throwx;}catch(Throwablex){thrownx;thrownewError(x);}finally{钩子方法afterExecute(task,thrown);}}finally{将局部变量task设置为nulltasknull;更新worker完成任务数量w。completedTasks;worker处理完成一个任务会释放独占锁,然后再次去queue获取任务1。正常情况下会再次回到getTask()获取任务while(getTask!null。。)2。task。run是内部抛出异常了‘w。unlock();}}getTask方法返回null时,说明当前线程应该执行退出逻辑1。正常情况completedAbruptlyfalse;}finally{task。run()时抛出异常时,直接从w。unlock()调到这里正常退出:completedAbruptlyfalse异常退出:completedAbruptlytrueprocessWorkerExit(w,completedAbruptly);}} 执行任务期间加锁原因是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程) 5。6:getTask方法分析 gettask方法主要的作用就是从workQueue中获取待执行任务返回null的情况1。(rsSTOP)2。前置条件状态是SHUTDOWN且workQueue。isEmpty()3。线程池中的线程数量超过最大限制,会有一部分返回null4。线程池中的线程数超过核心线程数量,会有一部分线程超时后返回nullprivateRunnablegetTask(){booleantimedOutfalse;Didthelastpoll()timeout?for(;;){intcctl。get();获取线程池状态intrsrunStateOf(c);条件一rsSHUTDOWN说明线程非RUNNING状态可能是SHUTDOWNSTOP条件二(rsSTOPworkQueue。isEmpty()2。1rsSTOP说明当前的状态最毒也是STOP一定返回null2。2前置条件状态是SHUTDOWN且workQueue。isEmpty()成立说明当前线程池状态为SHUTDOWN且任务队列已经空返回nullrunWorker方法将会返回null的线程执行线程退出的逻辑if(rsSHUTDOWN(rsSTOPworkQueue。isEmpty())){使用cas将ctl减一decrementWorkerCount();returnnull;}执行到这里几种情况1。线程running状态2。线程池SHUTDOWN但是队列不为空此时可以创建线程intwcworkerCountOf(c);timed表示当前线程获取task时是否支持超时机制booleantimedallowCoreThreadTimeOutwccorePoolSize;条件一:(wcmaximumPoolSize(timedtimedOut)1。1:wcmaximumPoolSiz?setMaximumPoolSize()方法可能外部线程将线程池最大的线程数调小1。2:(timedtimedOut)条件成立当前线程使用poll方式获取task。上一次循环使用poll获取任务已经超时条件一为true表示线程可以被回收,达到回收标准条件二:(wc1workQueue。isEmpty())2。1:wc1条件成立:说明线程池中还有其他线程,当前线程可以直接回收,返回null2。2:workQueue。isEmpty()前置条件wc1成立:说明当前任务队列已经空了,可以放心退出if((wcmaximumPoolSize(timedtimedOut))(wc1workQueue。isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{Runnablertimed?workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS):workQueue。take();if(r!null)returnr;timedOuttrue;}catch(InterruptedExceptionretry){timedOutfalse;}}} 5。7:processWorkerExit线程退出分析 当线程调用getTask()无法再次获取任务,此时会进行线程销毁操作。privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){if(completedAbruptly)Ifabrupt,thenworkerCountwasntadjusteddecrementWorkerCount();获取全局锁finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{completedTaskCountw。completedTasks;workers。remove(w);}finally{mainLock。unlock();}尝试终止操作tryTerminate();获取最新ctlintcctl。get();条件成立:当前状态是running或者SHUTDOWNif(runStateLessThan(c,STOP)){线程正常退出if(!completedAbruptly){allowCoreThreadTimeOuttrue核心线程数内的线程也会超时被回收intminallowCoreThreadTimeOut?0:corePoolSize;if(min0!workQueue。isEmpty())min1;if(workerCountOf(c)min)return;replacementnotneeded}1。completedAbruptlytrue线程异常退出2。当前任务队列有任务,当前状态为RUNNING或SHUTDOWN3。当前线程数量《corePoolSize,维护线程池线程数addWorker(null,false);}} 5。8:tryTerminate分析 tryTerminate主要尝试线程池的中断操作。finalvoidtryTerminate(){for(;;){获取最新ctl值intcctl。get();判断线程池工作状态if(isRunning(c)runStateAtLeast(c,TIDYING)(runStateOf(c)SHUTDOWN!workQueue。isEmpty()))return;来到这里情况1。线程池状态》STOP2。线程池状态是SHUTDOWN且workQueue。isEmpty()if(workerCountOf(c)!0){中断一个空闲线程空闲线程在:queue。task()queue。poll()因为线程状态是》STOP(SHUTDOWN队列为空)最终调用addWorker会失败最终空闲线程都会执行退出,非空闲线程执行完当前TASK时也会调用tryTerminate有可能执行到这interruptIdleWorkers(ONLYONE);return;}执行到这里的线程此时workerCountOf(c)0finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{if(ctl。compareAndSet(c,ctlOf(TIDYING,0))){try{terminated();}finally{ctl。set(ctlOf(TERMINATED,0));唤醒调用awaitTermination()方法线程termination。signalAll();}return;}}finally{mainLock。unlock();}elseretryonfailedCAS}}六:总结 线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中得线程个数。通过线程池状态来控制任务的执行,每个worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销 作者:海边奔跑的蜗牛 链接:https:juejin。cnpost7079605753094340644