前言 Java中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的使用线程池可以带来多个好处: (1)降低资源消耗。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。 (2)提高响应速度。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。 (3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。1、线程池的实现原理 线程池的处理流程如上图所示 线程池中通过ctl字段来表示线程池中的当前状态,主池控制状态ctl是AtomicInteger类型,包装了两个概念字段:workerCount和runState,workerCount表示有效线程数,runState表示是否正在运行、正在关闭等状态。使用ctl字段表示两个概念,ctl的前3位表示线程池状态,线程池中限制workerCount为(229)1(约5亿)个线程,而不是(231)1(20亿)个线程。workerCount是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当ThreadFactory在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。runState提供主要的生命周期控制,取值如下表所示: 字段名 含义 RUNNING 接受新任务并处理排队任务 SHUTDOWN 不接受新任务,但处理排队任务 STOP 不接受新任务,不处理排队任务,并中断正在进行的任务 TIDYING 所有任务都已终止,workerCount为零,转换到状态TIDYING的线程将运行terminate()方法 TERMINATED terminate()方法执行完成 runState随着时间的推移而改变,在awaitTermination()方法中等待的线程将在状态达到TERMINATED时返回。状态的转换为: RUNNINGSHUTDOWN在调用shutdown()时,可能隐含在finalize()中 (RUNNING或SHUTDOWN)STOP在调用shutdownNow()时 SHUTDOWNTIDYING当队列和线程池都为空时 STOPTIDYING当线程池为空时 TIDYINGTERMINATED当terminate()方法完成时 开发人员如果需要在线程池变为TIDYING状态时进行相应的处理,可以通过重载terminated()函数来实现。 结合上图说明线程池ThreadPoolExecutor执行流程,使用execute()方法提交任务到线程池中执行时分为4种场景: (1)线程池中运行的线程数量小于corePoolSize,创建新线程来执行任务。 (2)线程池中运行线程数量不小于corePoolSize,将任务加入到阻塞队列BlockingQueue。 (3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。 (4)当创建新的线程数量使线程池中当前运行线程数量超过maximumPoolSize,线程池中拒绝任务,调用RejectedExecutionHandler。rejectedExecution()方法处理。 源码分析:publicvoidexecute(Runnablecommand){if(commandnull)thrownewNullPointerException();intcctl。get();如果线程数小于基本线程数,创建线程执行if(workerCountOf(c)corePoolSize){if(addWorker(command,true))return;cctl。get();}如果线程数不小于基本线程数,将任务添加到队列中if(isRunning(c)workQueue。offer(command)){intrecheckctl。get();if(!isRunning(recheck)remove(command))reject(command);elseif(workerCountOf(recheck)0)addWorker(null,false);}如果队列已满,创建新的线程去处理elseif(!addWorker(command,false))执行拒绝策略reject(command);}复制代码 线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。2、线程池的创建和使用创建线程池 创建线程池之前,首先要知道创建线程池中的核心参数: corePoolSize(核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。 runnableTaskQueue(任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种: ArrayBlockingQueue:基于数组的有界阻塞队列,按照FIFO原则对元素进行排序。 LinkedBlockingQueue:基于链表的阻塞队列,按照FIFO原则对元素进行排序。 SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入操作必须要等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。 PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。 maximumPoolSize(最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。 RejectedExecutionHandler(拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK内置拒绝策略有以下4种: AbortPolicy:直接抛出异常 CallerRunsPolicy:使用调用者所在的线程来执行任务 DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务 DiscardPolicy:直接丢弃不处理 可以根据应用场景来实现RejectedExecutionHandler接口自定义处理策略。publicinterfaceRejectedExecutionHandler{voidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor);}复制代码 keepAliveTime(线程存活时间):线程池的工作线程空闲后,保持存活的时间。 TimeUnit(存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。 ThreadFactory(线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。 创建线程池主要分为两大类,第一种是通过Executors工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用Executors去创建,原因是规避资源耗尽的风险。使用Executors工厂类创建 创建一个单线程化的线程池publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable()));}复制代码 创建固定线程数的线程池publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit。MILLISECONDS,newLinkedBlockingQueueRunnable());}复制代码 以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致OOM 创建可缓存线程池publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0,Integer。MAXVALUE,60L,TimeUnit。SECONDS,newSynchronousQueueRunnable());}复制代码 允许创建的线程数量最大为Integer。MAXVALUE,当创建大量线程时会导致CPU处于重负载状态和OOM的发生自定义创建线程池publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueueRunnableworkQueue){this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,Executors。defaultThreadFactory(),defaultHandler);}复制代码向线程池提交任务 向线程池提交任务可以使用两个方法,分别为execute()和submit()。 execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute()方法中传入的是Runnable类的实例。publicstaticvoidmain(String〔〕args){。。。threadPool。execute(newRunnable{publicvoidrun(){dosomething。。。}});。。。}复制代码 submit()方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值。get()方法会阻塞当前线程直到任务完成,使用get(longtimeout,TimeUnitunit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。publicstaticvoidmain(String〔〕args){。。。FutureObjectfuturethreadPool。submit(handleTask);try{Objectsresfuture。get();}catch(InterruptedExceptione){处理中断异常}catch(ExecutionExceptione){处理无法执行异常}finally{threadPool。shutdown();}。。。}复制代码关闭线程池 可以通过调用线程池的shutdown()或shutdownNow()方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用interrupt()方法来中断线程,所以无法响应中断任务可能永远无法终止。publicvoidshutdown(){finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown();hookforScheduledThreadPoolExecutor}finally{mainLock。unlock();}tryTerminate();}publicListRunnableshutdownNow(){ListRunnabletasks;finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasksdrainQueue();}finally{mainLock。unlock();}tryTerminate();returntasks;}复制代码 shutdown()和shutdownNow()方法的区别在于shutdownNow方法首先将线程池的状态设置为STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow()privatevoidinterruptWorkers(){finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{for(Workerw:workers)w。interruptIfStarted();}finally{mainLock。unlock();}}。。。voidinterruptIfStarted(){Threadt;if(getState()0(tthread)!null!t。isInterrupted()){try{t。interrupt();}catch(SecurityExceptionignore){}}}复制代码shutdown()privatevoidinterruptIdleWorkers(){interruptIdleWorkers(false);}。。。privatevoidinterruptIdleWorkers(booleanonlyOne){finalReentrantLockmainLockthis。mainLock;mainLock。lock();try{for(Workerw:workers){Threadtw。thread;if(!t。isInterrupted()w。tryLock()){try{t。interrupt();}catch(SecurityExceptionignore){}finally{w。unlock();}}if(onlyOne)break;}}finally{mainLock。unlock();}}复制代码3、线程池参数设置推荐 线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。 (1)以任务型为参考的简单评估: 假设线程池大小的设置(N为CPU的个数) 如果纯计算的任务,多线程并不能带来性能提升,因为CPU处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为CPU数量或1;为什么1?因为可以防止N个线程中有一个线程意外中断或者退出,CPU不会空闲等待。 如果是IO密集型应用,则线程池大小设置为2N1。线程数CPU核数目标CPU利用率(1平均等待时间平均工作时间) (2)以任务数为参考的理想状态评估: 1)默认值corePoolSize1queueCapacityInteger。MAXVALUEmaxPoolSizeInteger。MAXVALUEkeepAliveTime60sallowCoreThreadTimeoutfalserejectedExecutionHandlerAbortPolicy()复制代码 2)如何设置需要根据相关值来决定tasks:每秒的任务数,假设为5001000taskCost:每个任务花费时间,假设为0。1sresponsetime:系统允许容忍的最大响应时间,假设为1s计算获取corePoolSize每秒需要多少个线程处理?threadcounttasks(1taskCost)taskstaskcout(5001000)x0。150100个线程。corePoolSize设置应该大于50根据8020原则,如果80的每秒任务数小于800,那么corePoolSize设置为80即可queueCapacity(coreSizePooltaskCost)responsetime计算可得queueCapacity800。11800。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行切记不能设置为Integer。MAXVALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。maxPoolSize(max(tasks)queueCapacity)(1taskCost)计算可得maxPoolSize(1000800)1020(50)(最大任务数队列容量)每个线程每秒处理能力最大线程数rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理keepAliveTime和allowCoreThreadTimeout采用默认通常能满足 以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpuload已经满了,则需要通过升级硬件和优化代码,降低taskCost来处理。 (仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)4、线程池使用场景推荐场景一 与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。创建线程池使用异步注解方式调用BeanpublicThreadPoolTaskExecutorasyncExecutorPool(){ThreadPoolTaskExecutorexecutornewThreadPoolTaskExecutor();配置核心线程数executor。setCorePoolSize(20);配置最大线程数executor。setMaxPoolSize(100);配置队列大小executor。setQueueCapacity(500);等待任务在关机时完成表明等待所有线程执行完executor。setWaitForTasksToCompleteOnShutdown(true);等待时间(默认为0,此时立即停止),并没等待xx秒后强制停止executor。setAwaitTerminationSeconds(60);配置线程池中的线程的名称前缀executor。setThreadNamePrefix(testasyncthread);rejectionpolicy:当pool已经达到maxsize的时候,如何处理新任务(CALLERRUNS:不在新线程中执行任务,而是有调用者所在的线程来执行)executor。setRejectedExecutionHandler(newThreadPoolExecutor。CallerRunsPolicy());初始化执行器executor。initialize();returnexecutor;}使用Async异步注解Async(asyncExecutorPool)publicvoidprocessTask1(){doSomething}复制代码 严禁在业务代码中起线程!!!创建线程池直接使用publicclassThreadPoolExecutorTest{privateThreadPoolExecutorexecutor;PostConstructpublicvoidinit(){线程池初始化ThreadPoolExecutorthreadPoolExecutornewThreadPoolExecutor(30,60,60,TimeUnit。SECONDS,newArrayBlockingQueue(200));threadPoolExecutor。setRejectedExecutionHandler(newThreadPoolExecutor。CallerRunsPolicy());ThreadFactorythreadFactorynewCustomizableThreadFactory(testcreatesthread);threadPoolExecutor。setThreadFactory(threadFactory);this。executorthreadPoolExecutor;}。。。publicvoidprocessTask(){Futurefutureexecutor。submit(doSomething。。。);}}复制代码场景二 当任务需要按照指定顺序(FIFO,LIFO,优先级)执行时,推荐创建使用单线程化的线程池。publicclassSingleExecutorTest{privateHashMapLong,ThreadPoolExecutorexecutorMapnewHashMap();。。。publicvoidinit(){线程池初始化for(inti0;i5;i){任务队列容量:1000ThreadPoolExecutorthreadPoolExecutornewThreadPoolExecutor(1,1,0,TimeUnit。SECONDS,newArrayBlockingQueue(1000));拒绝策略:静默丢弃,不抛异常threadPoolExecutor。setRejectedExecutionHandler(newThreadPoolExecutor。DiscardPolicy());ThreadFactorythreadFactorynewCustomizableThreadFactory(testSinglei);threadPoolExecutor。setThreadFactory(threadFactory);executorMap。put(Long。valueOf(i),threadPoolExecutor);}}。。。需要顺序执行的任务publicvoidprocessTask(){。。。获取单一线程池ThreadPoolExecutorexecutorexecutorMap。get(Long。valueOf(id5));向线程池中提交任务excutor。submit(doSomething。。。);}}复制代码总结 本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。 原文链接:https:juejin。cnpost7067324722811240479