ThreadPoolExecutor先抛出几个问题线程池参数有哪些都是什么意思线程池状态线程池如何保证核心线程不死亡线程池执行过程线程池有什么好处常见的线程池以及使用场景 直接上最简单是使用实例publicstaticvoidmain(String〔〕args){BlockingQueueRunnableblockingQueuenewArrayBlockingQueue(10);ThreadFactorypoolNamenewThreadFactoryBuilder()。setNameFormat(handsameguyd)。build();ThreadPoolExecutorexecutornewThreadPoolExecutor(2,5,1,TimeUnit。MINUTES,blockingQueue,poolName);开心的话这里可以加一个循环,里面做一个随机休眠executor。execute((){System。out。println(早上起来不要照镜子,不然会被自己帅死);});}早上起来不要照镜子,不然会被自己帅死二话不说直接看源码构造 先看构造:其实比较简单就是构造一些属性Createsanew{codeThreadPoolExecutor}withthegiveninitialparameters。paramcorePoolSizethenumberofthreadstokeepinthepool,eveniftheyareidle,unless{codeallowCoreThreadTimeOut}issetparammaximumPoolSizethemaximumnumberofthreadstoallowinthepoolparamkeepAliveTimewhenthenumberofthreadsisgreaterthanthecore,thisisthemaximumtimethatexcessidlethreadswillwaitfornewtasksbeforeterminating。paramunitthetimeunitforthe{codekeepAliveTime}argumentparamworkQueuethequeuetouseforholdingtasksbeforetheyareexecuted。Thisqueuewillholdonlythe{codeRunnable}taskssubmittedbythe{codeexecute}method。paramthreadFactorythefactorytousewhentheexecutorcreatesanewthreadparamhandlerthehandlertousewhenexecutionisblockedbecausethethreadboundsandqueuecapacitiesarereachedthrowsIllegalArgumentExceptionifoneofthefollowingholds: {codecorePoolSize0} {codekeepAliveTime0} {codemaximumPoolSize0} {codemaximumPoolSizecorePoolSize}throwsNullPointerExceptionif{codeworkQueue}or{codethreadFactory}or{codehandler}isnullcorePoolSize核心线程数maximumPoolSize最大线程数keepAliveTime空闲线程的最大超时时间unit超时时间的单位workQueue工作队列,保存未执行的Runnable任务threadFactory创建线程的工厂类handler拒绝策略publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){if(corePoolSize0maximumPoolSize0maximumPoolSizecorePoolSizekeepAliveTime0)thrownewIllegalArgumentException();if(workQueuenullthreadFactorynullhandlernull)thrownewNullPointerException();this。corePoolSizecorePoolSize;this。maximumPoolSizemaximumPoolSize;this。workQueueworkQueue;this。keepAliveTimeunit。toNanos(keepAliveTime);this。threadFactorythreadFactory;this。handlerhandler;}执行 来看执行,执行有2个方法一个是execute()另一个是submit()。先看execute()publicvoidexecute(Runnablecommand){if(commandnull)thrownewNullPointerException();Proceedin3steps:分为3个步骤1。IffewerthancorePoolSizethreadsarerunning,trytostartanewthreadwiththegivencommandasitsfirsttask。ThecalltoaddWorkeratomicallychecksrunStateandworkerCount,andsopreventsfalsealarmsthatwouldaddthreadswhenitshouldnt,byreturningfalse。1、当运行的线程数少于核心线程数的时候,尝试为当前的任务启动一个新的线程。通过调用addWorker自动检查runState和workerCount,防止可能增加的错误警报当它不应该线程,返回false。2。Ifataskcanbesuccessfullyqueued,thenwestillneedtodoublecheckwhetherweshouldhaveaddedathread(becauseexistingonesdiedsincelastchecking)orthatthepoolshutdownsinceentryintothismethod。Sowerecheckstateandifnecessaryrollbacktheenqueuingifstopped,orstartanewthreadiftherearenone。2、如果任务成功的排队,我们仍然需要做双重检查这个线程是否需要被加入。池关闭后进入此方法。所以我们重新检查状态,如果有必要回滚排队停止,或启动一个新线程3。Ifwecannotqueuetask,thenwetrytoaddanewthread。Ifitfails,weknowweareshutdownorsaturatedandsorejectthetask。3、如果新的新城不能排队任务,尝试添加一个新的线程。如果它失败了,关闭或饱和的所以拒绝这个任务。intcctl。get();if(workerCountOf(c)corePoolSize){1、if(addWorker(command,true))return;cctl。get();}2、if(isRunning(c)workQueue。offer(command)){intrecheckctl。get();if(!isRunning(recheck)remove(command))reject(command);elseif(workerCountOf(recheck)0)addWorker(null,false);}3、elseif(!addWorker(command,false))reject(command);} addWorker()firstTask任务core是否核心线程privatebooleanaddWorker(RunnablefirstTask,booleancore){1、自旋,确定是否可以创建worker。可以则跳出循环继续操作,否则返回falseretry:for(;;){intcctl。get();intrsrunStateOf(c);Checkifqueueemptyonlyifnecessary。检查线程池状态及队列是否空if(rsSHUTDOWN!(rsSHUTDOWNfirstTasknull!workQueue。isEmpty()))returnfalse;for(;;){intwcworkerCountOf(c);if(wcCAPACITYwc(core?corePoolSize:maximumPoolSize))returnfalse;CAS增长workerCount,成功则跳出循环if(compareAndIncrementWorkerCount(c))breakretry;重新获取ctl,状态改变则继续外层循环,否则在内层循环cctl。get();Rereadctlif(runStateOf(c)!rs)continueretry;elseCASfailedduetoworkerCountchange;retryinnerloop}}2、创建worker,这部分使用ReentrantLock锁booleanworkerStartedfalse;booleanworkerAddedfalse;Workerwnull;try{wnewWorker(firstTask);finalThreadtw。thread;if(t!null){finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{Recheckwhileholdinglock。BackoutonThreadFactoryfailureorifshutdownbeforelockacquired。获取到锁以后仍需要二次检查ctl,上一个获取到锁处理的线程可能会改变runState如ThreadFactory创建失败或线程池被shutdown等intrsrunStateOf(ctl。get());if(rsSHUTDOWN(rsSHUTDOWNfirstTasknull)){if(t。isAlive())precheckthattisstartablethrownewIllegalThreadStateException();加入工作队列workers。add(w);intsworkers。size();if(slargestPoolSize)largestPoolSizes;workerAddedtrue;}}finally{释放锁mainLock。unlock();}启动线程if(workerAdded){t。start();更改标志位workerStartedtrue;}}}finally{启动失败,处理启动失败线程。if(!workerStarted)addWorkerFailed(w);}returnworkerStarted;} 来看看worker长什么样子。 Worker是ThreadPoolExecutor的内部类,实现了AQS并继承了Runnable。privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{Thisclasswillneverbeserialized,butweprovideaserialVersionUIDtosuppressajavacwarning。privatestaticfinallongserialVersionUID6138294804551838833L;Threadthisworkerisrunningin。Nulliffactoryfails。每个worker有自己的内部线程,ThreadFactory创建失败时是nullfinalThreadthread;Initialtasktorun。Possiblynull。RunnablefirstTask;Perthreadtaskcounter完成的任务数volatilelongcompletedTasks;CreateswithgivenfirsttaskandthreadfromThreadFactory。paramfirstTaskthefirsttask(nullifnone)Worker(RunnablefirstTask){inhibitinterruptsuntilrunWorker禁止线程在启动前被打断setState(1);this。firstTaskfirstTask;this。threadgetThreadFactory()。newThread(this);}DelegatesmainrunlooptoouterrunWorkerrun方法publicvoidrun(){runWorker(this);}LockmethodsThevalue0representstheunlockedstate。Thevalue1representsthelockedstate。state0代表未锁;state1代表已锁protectedbooleanisHeldExclusively(){returngetState()!0;}protectedbooleantryAcquire(intunused){if(compareAndSetState(0,1)){setExclusiveOwnerThread(Thread。currentThread());returntrue;}returnfalse;}protectedbooleantryRelease(intunused){setExclusiveOwnerThread(null);setState(0);returntrue;}publicvoidlock(){acquire(1);}publicbooleantryLock(){returntryAcquire(1);}publicvoidunlock(){release(1);}publicbooleanisLocked(){returnisHeldExclusively();}终端启动的线程voidinterruptIfStarted(){Threadt;if(getState()0(tthread)!null!t。isInterrupted()){try{t。interrupt();}catch(SecurityExceptionignore){}}}} worker实现了简单的非重入互斥锁。worker实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可中断 再来看下worker的核心run方法finalvoidrunWorker(Workerw){ThreadwtThread。currentThread();Runnabletaskw。firstTask;w。firstTasknull;w。unlock();allowinterruptsbooleancompletedAbruptlytrue;try{循环直到tasknull(线程池关闭、超时等)注意这里的getTask()方法,我们配置的阻塞队列会在这里起作用while(task!null(taskgetTask())!null){执行前先加锁w。lock();Ifpoolisstopping,ensurethreadisinterrupted;ifnot,ensurethreadisnotinterrupted。ThisrequiresarecheckinsecondcasetodealwithshutdownNowracewhileclearinginterrupt为了确保线程池运行停止的时候线程运行中断,需要在第二种情况下进行重新获取ctlif((runStateAtLeast(ctl。get(),STOP)(Thread。interrupted()runStateAtLeast(ctl。get(),STOP)))!wt。isInterrupted())wt。interrupt();try{beforeExecute(wt,task);Throwablethrownnull;try{执行真正的run方法task。run();}catch(RuntimeExceptionx){thrownx;throwx;}catch(Errorx){thrownx;throwx;}catch(Throwablex){thrownx;thrownewError(x);}finally{afterExecute(task,thrown);}}finally{tasknull;w。completedTasks;w。unlock();}}completedAbruptlyfalse;}finally{线程退出工作processWorkerExit(w,completedAbruptly);}} 当没有任务的时候runWorker()方法会循环getTask()这个方法可能会阻塞privateRunnablegetTask(){booleantimedOutfalse;Didthelastpoll()timeout?for(;;){intcctl。get();intrsrunStateOf(c);Checkifqueueemptyonlyifnecessary。检查是否还继续处理任务if(rsSHUTDOWN(rsSTOPworkQueue。isEmpty())){decrementWorkerCount();returnnull;}intwcworkerCountOf(c);Areworkerssubjecttoculling?获取是否允许超市booleantimedallowCoreThreadTimeOutwccorePoolSize;if((wcmaximumPoolSize(timedtimedOut))(wc1workQueue。isEmpty())){if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{Runnablertimed?workQueue。poll(keepAliveTime,TimeUnit。NANOSECONDS):workQueue。take();if(r!null)returnr;timedOuttrue;}catch(InterruptedExceptionretry){timedOutfalse;}}}回答疑问线程池参数有哪些都是什么意思corePoolSize核心线程数maximumPoolSize最大线程数keepAliveTime空闲线程的最大超时时间unit超时时间的单位workQueue工作队列,保存未执行的Runnable任务threadFactory创建线程的工厂类handler拒绝策略线程池如何保证核心线程不死亡runWorker方法中调用了getTask方法,这个方法会阻塞。线程池执行过程 线程池有什么好处1、节省资源、提高效率。线程的创建和销毁相对都是比较消耗资源,利用线程池可以在需要的时候直接获取一个线程这样能尽可能的避免了重复的创建和销毁线程。2、方便管理可以编写线程池管理代码对池中的线程统一进行管理,避免因无休止的创建线程导致系统崩溃。线程池状态()RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED 下面这两个问题算是留尾巴吧,后面再抽时间来看常见的线程池以及使用场景线程池的状态的流转扩展 在阅读源码的时候其实能看到一些变量定义初始化以及状态的判断都是用位运算判断的。比如不同系统这里可能不一样,这里假设是32privatefinalAtomicIntegerctlnewAtomicInteger(ctlOf(RUNNING,0));线程个数的表示〔位数〕(不同平台int类型范围不一样)不管int是多少位,反正高三位就是表示线程状态,剩余的位数表示线程数量线程数量privatestaticfinalintCOUNTBITSInteger。SIZE3;线程最大数量privatestaticfinalintCAPACITY(1COUNTBITS)1;高三位表示线程池状态运行状态privatestaticfinalintRUNNING1COUNTBITS;privatestaticfinalintSHUTDOWN0COUNTBITS;privatestaticfinalintSTOP1COUNTBITS;privatestaticfinalintTIDYING2COUNTBITS;privatestaticfinalintTERMINATED3COUNTBITS;获取线程池状态privatestaticintrunStateOf(intc){returncCAPACITY;}获取线程池有效线程数量privatestaticintworkerCountOf(intc){returncCAPACITY;}获取上面提到的32位int类型的数值privatestaticintctlOf(intrs,intwc){returnrswc;} 这里拿RUNNING状态具体,看下具体是怎么算的。RUNNING1COUNTBITS 这里需要假设一下Integer。SIZE32COUNTBITS29 那么RUNNING1COUNTBITS表示1二进制左移29位。1的二进制是多少在计算机中,负数以其正值的补码形式表达。补码反码11的源码0000000000000000000000000000000000000000000000000000000000000001反码1111111111111111111111111111111111111111111111111111111111111110补码反码111111111111111111111111111111111111111111111111111111111111111111左移29位溢出位丢弃,空余位补0得到下面的结果1110000000000000000000000000000000000000000000000000000000000000 因此RUNNING高三位为111,同样方式可计算SHUTDOWN:000STOP:001TIDYING:010TERMINATED:011获取线程池状态privatestaticintrunStateOf(intc){returncCAPACITY;}CAPACITY从上面可以得知为1111111(29bit),取反后就是1110000000(29个0)c上面的结果就可以获取到高三位,而后29位全部为0获取线程池有效线程数量privatestaticintworkerCountOf(intc){returncCAPACITY;}CAPACITY为000111111(29个1)cCAPACITY,就可以获取变量c的低29位的值获取上面提到的32位int类型的数值rs为线程状态,wc表示线程数量privatestaticintctlOf(intrs,intwc){returnrswc;}