游戏电视苹果数码历史美丽
投稿投诉
美丽时装
彩妆资讯
历史明星
乐活安卓
数码常识
驾车健康
苹果问答
网络发型
电视车载
室内电影
游戏科学
音乐整形

彻底了解线程池

  我们开始今天的主题:线程池。线程池是面试中必问的八股文,我将涉及到到的问题分为3大类:基础使用线程池是什么?为什么要使用线程池?Executor框架是什么?Java提供了哪些线程池?实现原理线程池的底层原理是如何实现的?创建线程池的参数有哪些?线程池中的线程是什么时间创建的?系统设计如何合理的设置线程池的大小?如果服务器宕机,怎么处理队列中的任务?
  希望今天的内容能够帮你解答以上的问题。
  Tips:本文使用Java11源码进行分析;文章会在源码中添加注释,关键内容会有单独的分析。池化思想
  在你的编程生涯中,一定遇到过各种各样的池,如:数据库连接池,常量池,以及今天的线程池。无一例外,它们都是借助池化思想来管理计算机中的资源。
  维基百科中是这样描述池化的:
  Inresourcemanagement,poolingisthegroupingtogetherofresources(assets,equipment,personnel,effort,etc。)forthepurposesofmaximizingadvantageorminimizingrisktotheusers。Thetermisusedinfinance,computingandequipmentmanagement。
  池化指的是将资源汇聚到一起,以发挥优势或降低风险。
  接着来看维基百科中对池的描述:
  Incomputerscience,apoolisacollectionofresourcesthatarekept,inmemory,readytouse,ratherthanthememoryacquiredonuseandthememoryreleasedafterwards。Apoolclientrequestsaresourcefromthepoolandperformsdesiredoperationsonthereturnedresource。Whentheclientfinishesitsuseoftheresource,itisreturnedtothepoolratherthanreleasedandlost。
  计算机科学中的池,是内存中保存资源的集合,创建资源以备使用,停用时回收,而不是使用时创建,停用时丢弃。客户端从池中请求资源,并执行操作,当不再使用资源时,将资源归还到池中,而不是释放或丢弃。为什么要使用池?
  首先池是资源的集合,通过池可以实现对资源的统一管理;
  其次,池内存放已经创建并初始化的资源,使用时直接从池内获取,跳过了创建及初始化的过程,提高了响应速度;
  最后,资源使用完成后归还到池中,而非丢弃或销毁,提高资源的利用率。线程池
  池化思想的引入是为了解决资源管理中遇到的问题,而线程池正是借助池化思想实现的线程管理工具。那么线程池可以帮助我们解决哪些实际的问题呢?
  最直接的是控制线程的创建,不加以限制的创建线程会耗尽系统资源。不信的话你可以试试下面的代码:publicstaticvoidmain(String〔〕args){while(true){newThread((){})。start();}}复制代码
  Tips:卡顿警告
  其次,线程的创建和销毁是需要时间的,借助线程池可以有效的避免线程频繁的创建和销毁线程,提高程的序响应速度。
  问题解答:线程池是什么?为什么要使用线程池?Executor体系
  Java中提供了功能完善的Executor体系,用于实现线程池。先来了解下Executor体系中的核心成员间的关系:
  Executor体系的最顶层是Executor接口和ExecutorService接口,它们定义了Executor体系的核心功能。Executor接口
  Executor接口的注释:
  AnobjectthatexecutessubmittedRunnabletasks。Thisinterfaceprovidesawayofdecouplingtasksubmissionfromthemechanicsofhoweachtaskwillberun,includingdetailsofthreaduse,scheduling,etc。AnExecutorisnormallyusedinsteadofexplicitlycreatingthreads。
  Executor接口非常简单,只定义了execute方法,主要目的是将Runnable任务与执行机制(线程,调度任务等)解耦,提供了执行Runnable任务的方法。publicinterfaceExecutor{Executesthegivencommandatsometimeinthefuture。Thecommandmayexecuteinanewthread,inapooledthread,orinthecallingthread,atthediscretionofthe{codeExecutor}implementation。voidexecute(Runnablecommand);}复制代码ExecutorService接口
  ExecutorService接口继承了Executor接口,拓展了Executor接口的能力。ExecutorService接口的注释:
  AnExecutorthatprovidesmethodstomanageterminationandmethodsthatcanproduceaFuturefortrackingprogressofoneormoreasynchronoustasks。
  ExecutorService接口关键方法的声明:publicinterfaceExecutorServiceextendsExecutor{Initiatesanorderlyshutdowninwhichpreviouslysubmittedtasksareexecuted,butnonewtaskswillbeaccepted。Invocationhasnoadditionaleffectifalreadyshutdown。voidshutdown();Attemptstostopallactivelyexecutingtasks,haltstheprocessingofwaitingtasks,andreturnsalistofthetasksthatwereawaitingexecution。Thismethoddoesnotwaitforactivelyexecutingtaskstoterminate。Use{linkawaitTerminationawaitTermination}todothat。ListRunnableshutdownNow();booleanisShutdown();booleanisTerminated();Blocksuntilalltaskshavecompletedexecutionafterashutdownrequest,orthetimeoutoccurs,orthecurrentthreadisinterrupted,whicheverhappensfirst。booleanawaitTermination(longtimeout,TimeUnitunit)throwsInterruptedException;TFutureTsubmit(CallableTtask);TFutureTsubmit(Runnabletask,Tresult);SubmitsaRunnabletaskforexecutionandreturnsaFuturerepresentingthattask。TheFutures{codeget}methodwillreturn{codenull}uponemsuccessfulemcompletion。Futurelt;?submit(Runnabletask);}复制代码
  对关键方法做一个说明:继承自Executor接口:execute:执行Runnable任务;ExecutorService接口定义的方法:submit:执行Runnable或Callable任务,并返回Future;shutdown:允许已提交的任务执行完毕,但不接受新任务的关闭;shutdownNow:尝试关闭所有任务(正在等待执行),并返回等待执行的任务。
  Tips:其余方法建议阅读源码中的注释,即便是提到的4个方法,也要阅读注释。
  问题解答:Executor框架是什么?ThreadPoolExecutor核心流程
  Executor体系中,大家最熟悉的一定是ThreadPoolExecutor实现了,也是我们能够实现自定义线程池的基础。接下来逐步分析ThreadPoolExecutor的实现原理。构造线程池
  ThreadPoolExecutor提供了4个构造方法,我们来看参数最全的那个构造方法:publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize0maximumPoolSize0maximumPoolSizecorePoolSizekeepAliveTime0)thrownewIllegalArgumentException();if(workQueuenullthreadFactorynullhandlernull)thrownewNullPointerException();this。corePoolSizecorePoolSize;this。maximumPoolSizemaximumPoolSize;this。workQueueworkQueue;this。keepAliveTimeunit。toNanos(keepAliveTime);this。threadFactorythreadFactory;this。handlerhandler;}复制代码
  ThreadPoolExecutor的构造方法提供了7个参数:intcorePoolSize:线程池的核心线程数量,创建线程的数量小于等于corePoolSize时,会一直创建线程;intmaximumPoolSize:线程池的最大线程数量,当线程数量等于corePoolSize后且队列已满,允许继续创建(maximumPoolSizecorePoolSize)(maximumPoolSizecorePoolSize)(maximumPoolSizecorePoolSize)个线程;longkeepAliveTime:线程的最大空闲时间,当创建了超出corePoolSize数量的线程后,这些线程在不执行任务时能够存活的时间,超出keepAliveTime后会被销毁;TimeUnitunit:keepAliveTime的单位;BlockingQueueworkQueue:阻塞队列,用于保存等待执行的任务;ThreadFactorythreadFactory:线程工厂,用于创建线程,默认使用Executors。defaultThreadFactory()。RejectedExecutionHandlerhandler:拒绝策略,当队列已满,且没有空闲的线程时,执行的拒绝任务的策略。
  Tips:有些小伙伴会疑问,如果每次执行一个任务,执行完毕后再执行新任务,线程池依旧会创建corePoolSize个线程吗?答案是会的,后文解释。
  问题解答:创建线程池的参数有哪些?主控状态CTL与线程池状态
  ThreadPoolExecutor中定义了主控状态CTL和线程池状态:Themainpoolcontrolstate,ctl,isanatomicintegerpackingtwoconceptualfieldsworkerCount,indicatingtheeffectivenumberofthreadsrunState,indicatingwhetherrunning,shuttingdownetcprivatefinalAtomicIntegerctlnewAtomicInteger(ctlOf(RUNNING,0));privatestaticfinalintCOUNTBITSInteger。SIZE3;29privatestaticfinalintCOUNTMASK(1COUNTBITS)1;00011111111111111111111111111111privatestaticfinalintRUNNING1COUNTBITS;11100000000000000000000000000000privatestaticfinalintSHUTDOWN0COUNTBITS;00000000000000000000000000000000privatestaticfinalintSTOP1COUNTBITS;00100000000000000000000000000000privatestaticfinalintTIDYING2COUNTBITS;01000000000000000000000000000000privatestaticfinalintTERMINATED3COUNTBITS;01100000000000000000000000000000privatestaticintrunStateOf(intc){returncCOUNTMASK;}privatestaticintworkerCountOf(intc){returncCOUNTMASK;}privatestaticintctlOf(intrs,intwc){returnrswc;}复制代码
  CTL包含了两部分内容:线程池状态(runState,源码中使用rs替代)和工作线程数(workCount,源码中使用wc替代)。当看到位运算符和MASK一起出现时,就应该想到应用了位掩码技术。
  主控状态CTL的默认值是RUNNING0即:11100000000000000000000000000000。runStateOf方法返回低29位为0的CTL,与之对应的是线程池状态,workerCountOf方法则返回高3位为0的CTl,用低29位表示工作线程数量,所以线程池最多允许536870911个线程。
  Tips:工作线程指的是已经创建的线程,并不一定在执行任务,后文解释;位运算的可以参考编程技巧:高端的位运算;Java中二进制使用补码,注意原码,反码和补码间的转换。线程池的状态
  注释中对线程池的状态做出了详细的说明:RUNNING:Acceptnewtasksandprocessqueuedtasks
  SHUTDOWN:Dontacceptnewtasks,butprocessqueuedtasks
  STOP:Dontacceptnewtasks,dontprocessqueuedtasks,andinterruptinprogresstasks
  TIDYING:Alltaskshaveterminated,workerCountiszero,thethreadtransitioningtostateTIDYINGwillruntheterminated()hookmethod
  TERMINATED:terminated()hascompletedRUNNING:接收新任务,处理队列中的任务;SHUTDOWN:不接收新任务,处理队列中的任务;STOP:不接收新任务,不处理队列中的任务,中断正在执行的任务;TIDYING:所有任务已经执行完毕,并且工作线程为0,转换到TIDYING状态后将执行Hook方法terminated();TERMINATED:terminated()方法执行完毕。状态的转换
  注释中也对线程池状态的转换做出了详细说明:RUNNINGSHUTDOWNOninvocationofshutdown()
  (RUNNINGorSHUTDOWN)STOPOninvocationofshutdownNow()
  SHUTDOWNTIDYINGWhenbothqueueandpoolareempty
  STOPTIDYINGWhenpoolisempty
  TIDYINGTERMINATEDWhentheterminated()hookmethodhascompleted
  我们通过一张状态转换图来了解线程池状态之间的转换:
  结合源码,可以看到线程池的状态从RUNNING到TERMINATED其数值是单调递增的,换句话说线程池从活着到死透所对应的数值是逐步增大,所以可以使用数值间的比较去确定线程池处于哪一种状态。使用线程池
  我们已经对ThreadPoolExecutor有了一个整体的认知,现在可以创建并使用线程池了:ThreadPoolExecutorthreadPoolExecutornewThreadPoolExecutor(2,4,10,TimeUnit。SECONDS,newLinkedBlockingQueue(6));threadPoolExecutor。submit((){业务逻辑});复制代码
  这里我使用最简单的构造方法,我们看到在线程池中提交任务使用的是submit方法,该方法在抽象类AbstractExecutorService中实现:publicabstractclassAbstractExecutorServiceimplementsExecutorService{publicFuturelt;?submit(Runnabletask){if(tasknull)thrownewNullPointerException();RunnableFutureVoidftasknewTaskFor(task,null);execute(ftask);returnftask;}publicFuturelt;?submit(Runnabletask){if(tasknull)thrownewNullPointerException();RunnableFutureVoidftasknewTaskFor(task,null);execute(ftask);returnftask;}publicTFutureTsubmit(CallableTtask){if(tasknull)thrownewNullPointerException();RunnableFutureTftasknewTaskFor(task);execute(ftask);returnftask;}}复制代码
  submit的重载方法之间只有参数列表的差别,实现逻辑是相同的,均是先封装RunnableFuture对象,再调用ThreadPoolExecutorexecute方法。
  问题解答:submit()和execute()方法有什么区别?execute方法
  继承自Executor接口的execute方法是线程池的关键方法:publicvoidexecute(Runnablecommand){检测待执行任务if(commandnull){thrownewNullPointerException();}获取主控状态CTLintcctl。get();STEP1:当工作线程数量小于核心线程时,执行addWorker方法if(workerCountOf(c)corePoolSize){if(addWorker(command,true)){return;}cctl。get();}当工作线程数量大于核心线程数量时STEP2:首先判断线程池是否处于运行状态,接着尝试添加到队列中if(isRunning(c)workQueue。offer(command)){再次检查线程池状态intrecheckctl。get();不再处于RUNNING,则从队列中删除当前任务,并执行拒绝策略if(!isRunning(recheck)remove(command)){reject(command);}elseif(workerCountOf(recheck)0){addWorker(null,false);}}STEP3:无法添加到队列时,尝试执行addWorkerelseif(!addWorker(command,false))addWorker执行失败,则执行拒绝策略reject(command);}复制代码
  阅读execute方法的源码时需要知道一个前提,addWorker方法会检查线程池状态和工作线程数量,并执行工作任务。接着来看execute方法的3种执行情况:STEP1:线程池状态:RUNNING,工作线程数:小于核心线程数,此时执行addWorker(command,true);STEP2:线程池状态:RUNNING,工作线程数:等于核心线程数,队列:未饱和,添加到队列中;STEP3:线程池状态:RUNNING,工作线程数:等于核心线程数,队列:已饱和,执行addWorker(command,false)。
  需要重点关注STEP1的部分,还记得构造线程池最后的问题吗?STEP1便解释了为什么一个接一个的执行任务,依旧会创建出corePoolSize个线程。接着我们通过一张流程图展示execute方法的执行流程:
  流程图画得比较复杂,因为有些判断看似在一行中执行,实际上是借助了运算符短路的特性来决定是否执行,例如isRunning(c)workQueue。offer(command)中,如果isRunning(c)false则不会执行workQueue。offer(command)。addWorker方法privatebooleanaddWorker(RunnablefirstTask,booleancore)复制代码
  返回值为布尔类型表示是否成功执行,参数列表中有两个参数:RunnablefirstTask,待执行任务;booleancore,true表示最多允许创建corePoolSize个线程,false表示使用最多允许创建maximumPoolSize个线程。
  在分析execute方法的过程中,我们提前剧透了addWorker方法的功能:检查线程池状态和工作线程数量执行工作任务
  因此addWorker方法的源码部分我们分成两部分来看。
  Tips:再次强调本文使用Java11源码进行分析,在addWorker方法的实现上Java11与Java8存在差异。检查线程池状态和工作线程数量
  第一部分是线程池状态和工作线程数量检查的源码:retry:获取主控状态CTLfor(intcctl。get();;){注释1Java11相对友好很多,减少了很多!的使用,看起来比较符合人的思维这部分判断可以分成两部分:1。至少为SHUTDOWN状态2。条件3选1满足:21,至少为STOP状态22,firstTask不为空23,workQueue为空if(runStateAtLeast(c,SHUTDOWN)(runStateAtLeast(c,STOP)firstTask!nullworkQueue。isEmpty())){returnfalse;}for(;;){coretrue,保证工作线程数量小于核心线程数量corefalse,保证线程数量小于最大线程数量if(workerCountOf(c)((core?corePoolSize:maximumPoolSize)COUNTMASK)){returnfalse;}增加工作线程数量并退出if(compareAndIncrementWorkerCount(c)){breakretry;}如果至少是SHUTDOWN状态,则重新执行cctl。get();if(runStateAtLeast(c,SHUTDOWN)){continueretry;}}}复制代码
  注释1的代码并不复杂,只是需要结合线程池在不同状态下的处理逻辑来分析:当状态至少为SHUTDOWN时,什么情况不需要处理?添加新的任务(对应条件22)队列为空(对应条件23)当状态至少为STOP时,线程池应当立即停止,不接收,不处理。
  Tips:线程池状态的部分说线程池状态从RUNNING到TERMINATED是单调递增的,因此在Java11的实现中才会出现runStateAtLeast方法。执行工作任务
  第二部分是执行工作任务的源码:booleanworkerStartedfalse;booleanworkerAddedfalse;Workerwnull;try{创建Worker对象wnewWorker(firstTask);从worker对象中获取线程finalThreadtw。thread;if(t!null){上锁finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{intcctl。get();线程池状态检查RUNNING状态,或者小于STOP状态(处理队列中的任务)if(isRunning(c)(runStateLessThan(c,STOP)firstTasknull)){线程状态检查if(t。getState()!Thread。State。NEW){thrownewIllegalThreadStateException();}将Worker对象添加到workers中workers。add(w);workerAddedtrue;intsworkers。size();if(slargestPoolSize){记录线程池中出现过的最大线程数largestPoolSizes;}}}finally{mainLock。unlock();}if(workerAdded){启动线程t。start();workerStartedtrue;}}}finally{if(!workerStarted){addWorker执行失败addWorkerFailed中包含工作线程数减1的逻辑addWorkerFailed(w);}}returnworkerStarted;复制代码
  结合两部分代码,一个正向流程是这样的:检查状态:检查是否允许创建Worker,如果允许执行compareAndIncrementWorkerCount(c),CTL中工作线程数量1;执行任务:创建Worker对象,通过Worker对象获取线程,添加到workers中,最后启动线程。
  回过头看我们之前一直提到的工作线程,实际上是Worker对象,我们可以近似的将Worker对象和工作线程画上等号。
  问题解答:线程池中的线程是什么时间创建的?三调addWorker
  execute方法中,有3种情况调用addWorker方法:STEP1:addWorker(command,true)STEP2:addWorker(null,false)STEP3:addWorker(command,false)
  STEP1和STEP3很好理解,STEP1最多允许创建corePoolSize个线程,STEP3最多允许创建maximumPoolSize个线程。STEP2就比较难理解了,传入了空任务然后调用addWorker方法。
  什么情况下会执行到addWorker(null,false)?第1个条件:workerCountcorePoolSizeworkerCountgeqcorePoolSizeworkerCountcorePoolSize。第2个条件:isRunning(c)workQueue。offer(command)第3个条件:workerCountOf(recheck)0
  处于RUNNING状态的条件不难理解,矛盾的是第1个条件和第3个条件。根据这两个条件可以得到:corePoolSizeworkCount0corePoolSizeleqworkCount0corePoolSizeworkCount0,也就是说允许创建核心线程数为0的线程池。
  接着我们来看addWorker(null,false)做了什么?创建了Worker对象,添加到workers中,并调用了一次Thread。start,虽然没有任何待执行的任务。
  为什么要创建一个Worker对象?别忘了,已经执行过workQueue。offer(command)了,需要保证线程池中至少有一个Worker,才能执行workQueue中的任务。工具人Worker
  实际上ThreadPoolExecutor维护的工作线程就是Worker对象,我们来看Worker类的原码:privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{finalThreadthread;RunnablefirstTask;volatilelongcompletedTasks;Worker(RunnablefirstTask){setState(1);this。firstTaskfirstTask;通过默认线程工厂创建线程this。threadgetThreadFactory()。newThread(this);}publicvoidrun(){runWorker(this);}}复制代码
  Worker继承自AbstractQueuedSynchronizer,并实现了Runnable接口。
  我们重点关注构造方法,尤其是this。threadgetThreadFactory()。newThread(this),通过线程工厂创建线程,传入的Runnable接口是谁?
  是Worker对象本身,也就是说如果有worker。getThread()。start(),此时会执行Worker。run方法。
  Tips:AbstractQueuedSynchronizer就是大名鼎鼎的AQS,Worker借助AQS实现非重入独占锁,不过这部分不是今天的重点;Woker对象与自身的成员变量thread的关系可谓是水乳交融,好好梳理下,否则会很混乱。runWorker方法
  runWorker方法传入的是Worker对象本身,来看方法实现:finalvoidrunWorker(Workerw){注释1ThreadwtThread。currentThread();Worker对象中获取执行任务Runnabletaskw。firstTask;将Worker对象中的任务置空w。firstTasknull;w。unlock();booleancompletedAbruptlytrue;try{注释2while(task!null(taskgetTask())!null){w。lock();线程池的部分状态要中断正在执行的任务if((runStateAtLeast(ctl。get(),STOP)(Thread。interrupted()runStateAtLeast(ctl。get(),STOP)))!wt。isInterrupted()){wt。interrupt();}try{beforeExecute(wt,task);try{执行任务task。run();afterExecute(task,null);}catch(Throwableex){afterExecute(task,ex);throwex;}}finally{tasknull;w。completedTasks;w。unlock();}}completedAbruptlyfalse;}finally{processWorkerExit(w,completedAbruptly);}}复制代码
  大家可能会对注释1的部分比较迷惑,这个ThreadwtThread。currentThread()是什么鬼?别急,我带你从头梳理一下。
  以使用线程池中的代码为例,假设是首次执行,我们看主线程做了什么:
  刚才也说了,Worker对象的线程在启动后执行worker。run,也即是在runWorker方法中Thread。currentThread()是Worker对象的线程,并非主线程。
  再来看注释2的部分,第一次进入循环时,执行的task是Runnabletaskw。firstTask,即初次判断task!null,第二次进入循环时,task是通过taskgetTask()获取的。
  线程池中,除了当前Worker正在执行的任务,还有谁可以提供待执行任务?答案是队列,因此我们可以合理得猜测getTask()是获取队列中的任务。getTask方法privateRunnablegetTask(){上次从队列中获取任务是否超时booleantimedOutfalse;for(;;){线程池状态判断,某些状态下不需要处理队列中的任务intcctl。get();if(runStateAtLeast(c,SHUTDOWN)(runStateAtLeast(c,STOP)workQueue。isEmpty())){decrementWorkerCount();returnnull;}intwcworkerCountOf(c);allowCoreThreadTimeOut是否允许核心线程超时销毁,默认为false通过allowCoreThreadTimeOut方法设置wccorePoolSize为true表示启用了非核心线程booleantimedallowCoreThreadTimeOutwccorePoolSize;wcmaximumPoolSize,可能的情况是因为同时执行了setMaximumPoolSize方法timedtimedOut为true时,表示上次获取任务超时,当前需要进行超时控制wc1workQueue。isEmpty(),工作线程数量大于1或队列为空if((wcmaximumPoolSize(timedtimedOut))(wc1workQueue。isEmpty())){减少工作线程数量if(compareAndDecrementWorkerCount(c)){returnnull;}continue;}try{注释1从队列中获取待执行任务Runnablertimed?workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS):workQueue。take();if(r!null){returnr;}timedOuttrue;}catch(InterruptedExceptionretry){timedOutfalse;}}}复制代码
  注释1的部分有两种获取任务的方式:workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS),获取队首元素,如果当前队列为空,则等待指定时间后返回null;workQueue。take(),获取队首元素,如果队列为空,则一直等待,直到有返回值。
  线程池只会在一种情况下使用workQueue。take,即不允许核心线程超时销毁,同时线程池的工作线程数量小于核心线程数量,结合runWorker方法的源码我们可以得知,此时借助了阻塞队列的能力,保证runsWoker方法一直停留在taskgetTask()上,直到getTask()返回响应的任务。
  而在选择使用workQueue。poll时存在两种情况:允许核心线程超时销毁,即allowCoreThreadTimeOuttrue;当前工作线程数大于核心线程数,即线程池已经创建足够数量的核心线程,并且队列已经饱和,开始创建非核心线程处理任务。
  结合runWorker方法的源码我们可以知道,如果队列中的任务已经被消耗完毕,即getTask()返回null,则会跳出while循环,执行processWorkerExit方法。processWorkerExit方法privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){runWorker执行失败的场景if(completedAbruptly){decrementWorkerCount();}finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{completedTaskCountw。completedTasks;从workers中删除Workerworkers。remove(w);}finally{mainLock。unlock();}根据线程池状态判断是否结束线程池tryTerminate();intcctl。get();STOP之下的状态,runWorker正常结束时completedAbruptlyfalse保证至少有1个worker,用于处理队列中的任务if(runStateLessThan(c,STOP)){if(!completedAbruptly){intminallowCoreThreadTimeOut?0:corePoolSize;if(min0!workQueue。isEmpty()){min1;}if(workerCountOf(c)min){return;}}runWorker异常退出时,即completedAbruptlytrue或者是workers存活少于1个addWorker(null,false);}}复制代码
  processWorkerExit方法做了3件事:移除多余的Worker对象(允许销毁的核心线程或者非核心线程);尝试修改线程池状态;保证在至少存活1个Worker对象。
  Tips:我跳过了tryTerminate()方法的分析,对,是故意的
  问题解答:线程池的底层原理是如何实现的?销毁非核心线程
  设想一个场景:已经创建了足够数量的核心线程,并且队列已经饱和,仍然有任务提交时,会是怎样的执行流程?
  线程池创建非核心线程处理任务,当非核心线程执行完毕后并不会立即销毁,而是和核心线程一起去处理队列中的任务。那么当所有的任务都处理完毕之后呢?
  回到runWorker中,当所有任务执行完毕后再次进入循环,getTask中判断工作线程数大于和核心线程数,此时启用workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS),而keepAliveTime就是构建线程池时设定的线程最大空闲时间,当超过keepAliveTime后仍旧没有获得任务返回null,跳出runWorker的循环,执行processWorkerExit销毁非核心线程。ThreadPoolExecutor拾遗
  目前我们已经详细分析了线程池的执行流程,这里我会补充一些前文未涉及到的内容,因为是补充内容,所以涉及不会详细的解释源码。预创建线程
  我们在提到线程池的优点时会特别强调一句,池内保存了创建好的资源,使用时直接取出,但线程池好像依旧是首次接到任务后才会创建资源啊?
  实际上,线程池提供prestartCoreThread方法,用于预创建核心线程:publicbooleanprestartCoreThread(){returnworkerCountOf(ctl。get())corePoolSizeaddWorker(null,true);}复制代码
  如果你的程序需要做出极致的优化,可以选择预创建核心线程。关闭和立即关闭
  ThreadPoolExecutor提供了两个关闭的功能shutdown和shutdownNow:publicvoidshutdown(){finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{checkShutdownAccess();advanceRunState(SHUTDOWN);中断空闲线程interruptIdleWorkers();ScheduledThreadPoolExecutor的hookonShutdown();}finally{mainLock。unlock();}tryTerminate();}publicListRunnableshutdownNow(){ListRunnabletasks;finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{checkShutdownAccess();advanceRunState(STOP);中断所有线程interruptWorkers();tasksdrainQueue();}finally{mainLock。unlock();}tryTerminate();returntasks;}复制代码
  两者的差别还是很明显的:shutdown将线程池的状态改为SHUTDOWN,而shutdownNow则改为STOP;shutdown不返回队列中的任务,而shutdownNow返回队列中的任务,因为STOP状态不会再去执行队列的任务;shutdown中断空闲线程,而shutdownNow则是中断所有线程。
  从实现效果上来看关闭shutdown会更温和一些,而立即关闭shutdownNow则更为强烈,仿佛语气中带着不容置疑。拒绝策略
  线程池不会无条件的接收任务,有两种情况下它会拒绝任务:核心线程已满,添加到队列后,线程池不再处于RUNNING状态,此时从队列删除任务,并执行拒绝策略;核心线程已满,队列已满,非核心线程已满,此时执行拒绝策略。
  Java提供了RejectedExecutionHandler接口:publicinterfaceRejectedExecutionHandler{voidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor);}复制代码
  因此,我们可以通过实现RejectedExecutionHandler接口,完成自定义拒绝策略。另外,Java中也提供了4种默认拒绝策略:AbortPolicy:直接抛出异常;CallerRunsPolicy:提交任务的线程执行;DiscardOldestPolicy:丢弃队列中最先加入的线程;DiscardPolicy:直接丢弃,就是啥也不干。
  源码非常简单,大家自行阅读即可。Java提供了哪些线程池
  如果不想自己定义线程池,Java也贴心的提供了4种内置线程池,默认线程池通过Executors获取。
  Java的命名中,s后缀通常是对应工具类,通常提供大量静态方法,例如:Collections之于Collection。所以即便属于Executor体系中的一员,但却没办法在族谱上出现,打工人的悲惨命运。FixedThreadPoolpublicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable());}publicstaticExecutorServicenewFixedThreadPool(intnThreads,ThreadFactorythreadFactory){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable(),threadFactory);}复制代码
  固定大小线程池,核心线程数和最大线程数一样,看起来都还不错,主要的问题是通过无参构造器创建的LinkedBlockingQueue,它允许的最大长度是Integer。MAXVALUE。
  Tips:这也就是为什么《阿里巴巴Java开发手册》中不推荐的原因。CachedThreadPoolpublicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer。MAXVALUE,60L,TimeUnit。SECONDS,newSynchronousQueueRunnable());}publicstaticExecutorServicenewCachedThreadPool(ThreadFactorythreadFactory){returnnewThreadPoolExecutor(0,Integer。MAXVALUE,60L,TimeUnit。SECONDS,newSynchronousQueueRunnable(),threadFactory);}复制代码
  可以说是无限大的线程池,接到任务就创建新线程,另外SynchronousQueue是非常特殊的队列,不存储数据,每个put操作对应一个take操作。我们来分析下实际可能发生的情况:前提:大量并发涌入提交第一个任务,进入队列,判断核心线程数为0,执行addWorker(null,false),对应execute的SETP2;提交第二个任务,假设第一个任务未结束,第二个任务直接提交到队列中;提交第三个任务,假设第一个任务未结束,无法添加到队列中,执行addWorker(command,false)对应execute的SETP3。
  也就是说,只要提交的够快,就会无限创建线程。SingleThreadExecutorpublicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable()));}publicstaticExecutorServicenewSingleThreadExecutor(ThreadFactorythreadFactory){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable(),threadFactory));}复制代码
  只有一个线程的线程池,问题也是在于LinkedBlockingQueue,可以无限的接收任务。ScheduledExecutorpublicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize){returnnewScheduledThreadPoolExecutor(corePoolSize);}publicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize,ThreadFactorythreadFactory){returnnewScheduledThreadPoolExecutor(corePoolSize,threadFactory);}publicstaticScheduledExecutorServicenewSingleThreadScheduledExecutor(){returnnewDelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1));}publicstaticScheduledExecutorServicenewSingleThreadScheduledExecutor(ThreadFactorythreadFactory){returnnewDelegatedScheduledExecutorService(newScheduledThreadPoolExecutor(1,threadFactory));}复制代码
  用来执行定时任务,DelegatedScheduledExecutorService是对ScheduledExecutorService的包装。
  在Executor体系的族谱中,是有体现到ScheduledExecutorService和ScheduledThreadPoolExecutor的,这部分留给大家自行分析了。
  除了以上4种内置线程池外,Java还提供了内置的ForkJoinPool:publicstaticExecutorServicenewWorkStealingPool(intparallelism){returnnewForkJoinPool(parallelism,ForkJoinPool。defaultForkJoinWorkerThreadFactory,null,true);}publicstaticExecutorServicenewWorkStealingPool(){returnnewForkJoinPool(Runtime。getRuntime()。availableProcessors(),ForkJoinPool。defaultForkJoinWorkerThreadFactory,null,true);}复制代码
  这部分是Java8之后提供的,我们暂时按下不表,放到后期关于ForkJoin框架中详细解释。
  问题解答:Java提供了哪些线程池?合理设置线程池
  通常我们在谈论合理设置线程池的时候,指的是设置线程池的corePoolSize和maximumPoolSize,合理的设置能够最大化的发挥线程池的能力。
  我们先来看美团技术团队的调研结果:
  无论是哪种公式,都是基于理论得出的结果,但往往理论到工程还有很长得一段路要走。
  按照我的经验,合理的设置线程池可以汇总成一句话:根据理论公式预估初始设置,随后对核心业务进行压测调整线程池设置。
  Java也提供了动态调整线程池的能力:publicvoidsetThreadFactory(ThreadFactorythreadFactory);publicvoidsetRejectedExecutionHandler(RejectedExecutionHandlerhandler);publicvoidsetCorePoolSize(intcorePoolSize);publicvoidsetMaximumPoolSize(intmaximumPoolSize);publicvoidsetKeepAliveTime(longtime,TimeUnitunit);复制代码
  除了workQueue都能调整,本文不讨论线程池动态调整的实现。
  Tips:调研结果源自于《Java线程池实现原理及其在美团业务中的实践》;该篇文章中也详细的讨论了动态化线程池的思路,推荐阅读。结语
  线程池的大部分内容就到这里结束了,希望大家够通过本篇文章解答绝大部分关于线程池的问题,带给大家一些帮助,如果有错误或者不同的想法,欢迎大家留言讨论

浓眉在巅峰期为何下滑?实际他可能已过巅峰期!全面分析让你信服安东尼戴维斯,外号浓眉哥,从2012年以状元身份进入联盟开始,就被外界普遍看好会在以后统治全联盟。而浓眉哥也确实不负所望,逐渐成长为联盟优秀的大个子球员,并在201920赛季与……日本遗孤的90岁中国养母去世!曾承受无数骂名!只因大爱无疆在许多时候,我们的观念和想法都会被我们的立场所影响。就算是一个十恶不赦的杀人罪犯,在执行死刑之前他的家人依然会觉得这名罪犯值得怜悯。同样的,由于历史的原因,日本这个国家和我们有……日本三年级的小学生说自己的优点是活着?母亲泪奔!道出难言之隐对于一个处于童年的孩子来说,本来这个世界就是单纯的,没有太多包装和掩饰,所以在这个年龄段的孩子们说的话,往往都是发自内心的声音。有的时候,孩子的话语往往能够体现出这个孩子的家庭……日本推出只针对于女性的健身房!为防止侵犯?顾客服务很贴心虽然说在如今我们的时代,对于男女的观念已经不再和过去相同,人们的思想也变得越来越开放。但是在一些场合,有时候异性之间会造成心理上的隔阂,会让彼此感到不舒服。这些时候,保持一定的……男人在日本能够休产假?日本相关法律改正!近百家企业开始实施一直以来,产假都是女性员工的专属假期,男性的员工是不能享受的。背后的逻辑其实很简单,就是男人又不用生孩子,凭什么不上班?女人在生育的时候,需要休息,需要养胎,需要各种各样的照顾……日本无名之辈的画家父亲,死后作品大火!女儿希望父亲能够看到在日本,画家这种职业已经不是很多见了,随着时代的发展,职业的倾向也逐渐发生了变化。原来以艺术和欣赏目的为主的画家,逐渐变成了各路漫画和杂志的画师,制作的作品也逐渐向商业和娱乐性……23城发放千万元红包金证股份迎数字人民币发展机遇国庆假期临近,出游、餐饮、购物等需求将迎来一轮高峰,数字人民币民生消费的消费促进也愈发重要。近日,中国银行、建设银行、民生银行在全国范围内23座数字人民币试点城市发放千万元数字……金秋国庆,约惠北京不可不看的假期消费指南美食篇无论是老胡同里的新味道,还是飘散着辣椒香气的簋街,亦或是国际范的时尚大餐。十一在北京过节,当然不能错过在熙熙攘攘、人来人往的北京城探访让然欲罢不能的美食。作为专业食客,在……环游冰岛别名烟城,世界上纬度最高的首都雷克雅未克为什么叫烟城呢?申明一点,那里的空气可是好得一塌糊涂,哪来的烟啊?原来,雷克雅未克遍布温泉,时常有水蒸气蒸腾漂浮,当初第一批登岛的维京人看到后以为是烟尘,此后以讹传讹,就称其为……我国20个旅游城市行政区划调整更名为景区名对旅游发展的影响分20世纪80年代以来进行行政区划调整的旅游城市,大部分以区域内比较知名的景区更改地名,而其它的行政要素基本不变。根据城镇规划君何方洪统计,我国通过行政区划调整或更名的旅游城市大……加币汇率一夜暴跌,兑换人民币直逼4字头,兑换加币正当时乐活蒙城加拿大蒙特利尔12月31日报道:最近这段时间密切关注加拿大汇率走势的小伙伴们应该都已经感觉到,啥叫冰火两重天!啥叫过山车!乐活君还记得上个月月初的时候,加币汇率飙……睡前保健提高睡眠质量的关键睡眠可以调节各种生理功能,稳定神经系统的平衡,是养生的一个重要环节。有规律的、保质保量的睡眠,有助于增强体质和延年益寿。那么,怎样才能有好的睡眠呢?关键是要掌握一些睡前的保健方……
我在建水古城,躲避喧嚣,感受古香古色古城,对我来说,总是致命的吸引!云南,又是多么神奇的地方啊!云南建水古城,你或者听说过它的名字,这次我终于走近了它,亲眼感受,每一块青砖黛瓦的明暗,被阳光抚摸过的每一寸青石板,……曼联9400万敲定安东尼!滕哈赫正式告知C罗距离夏窗关闭还剩1周左后的时间,引援工作正密锣紧鼓地进行着。而C罗的去留影响着球队的未来。英超第3轮,曼联21笑傲双红会,C罗本场85分钟替补出战,登场仅仅5分钟。赛前C……一万人中,能有多少人活到80岁?活到多少岁算长寿?越长寿越好小区的几位阿姨坐一起晒太阳时,聊起了长寿的话题:有人说自己能活到80岁就可以了,活太久也不是一件好事,要家人照顾。有人说心态要好点,不要刻意的去想自己能活到多少岁,……日本探月活动在2022正式启动以日本宇宙航空研究开发机构(JAXA)为中心的日本探月活动将在2022年正式启动。2月以后,日本将用美国的火箭发射计划首次着陆月球的无人探测器,最早在2022年度和2023年度……酱油生抽老抽味极鲜它们的区别在哪里?我们平时炒菜讲究的是色香味俱全,这其中自然少不了各种调味料的作用,平时我们炒菜的时候,会用到各种各样的调味品,例如盐、味精、鸡精、酱油、醋。。。。。。盐、味精等基础调味品……夜读遇事不争,遇难不避,遇错不责朝着心中的目标笃定前行。一个人思想的深度,决定了他的胸襟和气度。格局大的人,不会困于方寸之地,而是从容豁达,遇事沉稳。遇事不争内心丰盈的人,从不活在别人……137。77亿年!我们是如何对宇宙年龄进行估算的?估算的值准前不久,据物理学家组织网报道,康奈尔大学领导的一支国际天文学家小组,利用由美国国家科学基金会管理的阿塔卡马宇宙学望远镜(ACT)的数据和宇宙几何学,对宇宙年龄进行再次评估,他们……5岁侄女穿短裙坐叔叔大腿,爸爸不以为意,这些行为真的合适吗?大家好,欢迎来到超说教育。现在家长都十分注重孩子的性别教育。我们经常在日常生活中看到许多异性亲属和孩子之间亲密相处,他们很容易缺乏边界感。一直打着对孩子偏爱的名号,觉得不……春节期间如何做好一锅卤菜来招待亲朋好友传统卤菜基本流程:熬高汤炒糖色。红曲米。黄栀子(调色用)香辛料调味料下食材(预处理)卤菜是一套系统工程,为什么我们自己做的卤菜没有外面买的好吃,除了技术层面最重要的就是高……秋天,宁可少吃肉,也要多吃酸,这6道家常菜酸爽开胃又润燥秋天,宁可少吃肉,也要多吃酸,这6道家常菜酸爽开胃又润燥,应季而食,身体更健康老话说:秋后三天不吃酸,走路都要打窜窜,进入秋季后天气渐渐干燥,人体也会有很大变化,会出现便……分享皮肤松弛的五大原因和带你护肤不走弯路一般我们皮肤的松弛它是自然的一个老化所导致的。但是也有一部分的原因是因为生活的因素而引起的。那接下来我为大家来讲解一下还有哪些因素会导致我们的皮肤松弛的呢?首先第一个就是……中澳机票价应声暴跌!机票搜索飙至一年最高昨天,我们报道的中国官宣取消航班熔断!隔离缩短为53!朋友圈炸了,不再判定密接,20条优化措施公布这则消息可谓对广大海外的小伙伴们打了一剂强心剂,回国真的指日可待!而对于……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网