XXLadmin源码解析
xxljob的admin服务是xxljob的调度中心,负责管理和调度注册的job,关于xxljob的使用,可以阅读参考阅读中的《XXLJOB分布式调度框架全面详解》,这里主要是介绍admin中的源码。
admin服务除了管理页面上的一些接口外,还有一些核心功能,比如:
1、根据job的配置,自动调度job;
2、接收executor实例的请求,实现注册和下线;
3、监视失败的job,进行重试;
4、结束一些异常的job;
5、清理和统计日志;
这些功能都是在admin服务启动后,在后台自动运行的,下面将详细介绍admin服务这些功能的实现。XxlJobAdminConfig
admin的配置类
XxlJobAdminConfig是admin服务的配置类,在admin服务启动时,它除了配置admin服务的一些参数外,还会启动admin服务的所有后台线程。属性
该类的属性主要分为5类:
1、配置文件中的参数,比如accessToken;
2、DAO层各个数据表的mapper;
3、Spring容器中的一些Bean,比如JobAlarmer、DataSource等;
4、私有变量XxlJobScheduler对象;privateXxlJobSchedulerxxlJobScheduler;
5、私有静态变量adminConfig,指向实例自身。privatestaticXxlJobAdminConfigadminConfignull;publicstaticXxlJobAdminConfiggetAdminConfig(){returnadminConfig;}方法
该类有两个重要方法,分别实现自接口InitializingBean、DisposableBean,作用如下:afterPropertiesSet方法,在Spring容器中Bean初始化完成之后,在该方法中进行初始化;destroy方法,在容器销毁Bean时,会执行销毁操作;
这两个方法分别调用了XxlJobScheduler对象的init、destroy方法,源码如下:OverridepublicvoidafterPropertiesSet()throwsException{adminConfigthis;xxlJobSchedulernewXxlJobScheduler();xxlJobScheduler。init();}Overridepublicvoiddestroy()throwsException{xxlJobScheduler。destroy();}
XxlJobAdminConfig作为admin服务的配置类,作用就是在Spring容器启动时,调用XxlJobScheduler的初始化方法,来初始化和启动admin服务的功能。XxlJobScheduler
XxlJobScheduler的作用就是调用各个辅助类(xxxHelper)来启动和结束不同的线程和功能,初始化方法init的代码如下:
如果把XxlJobScheduler看做是一个启动器,那么init方法就是启动按钮,XxlJobAdminConfig的作用就是按下这个按钮。publicvoidinit()throwsException{0、初始化国际化消息,不是很重要忽略initI18n();1、初始化调度器的线程池JobTriggerPoolHelper。toStart();2、启动注册监视器线程JobRegistryHelper。getInstance()。start();3、启动失败job监视器线程,查询失败日志进行重试JobFailMonitorHelper。getInstance()。start();4、启动丢失job监视器线程,一些job发出调度指令后,一直没有响应,状态一直是运行中JobCompleteHelper。getInstance()。start();5、启动日志统计和清理线程JobLogReportHelper。getInstance()。start();6、启动调度线程,定时调度jobJobScheduleHelper。getInstance()。start();logger。info(initxxljobadminsuccess。);}
下面我们主要介绍init中各个类及其作用,最后再简单一下介绍destroy的作用。JobTriggerPoolHelper
Trigger线程池的辅助类:管理Trigger线程池、添加trigger线程到线程池
当admin服务向executor实例发出一个调度请求来执行job时,会调用XxlJobTrigger。trigger()方法把要传输的参数(比如jobid、jobHandler、joblogid、阻塞策略等,包装成TriggerParam对象)传给ExecutorBiz对象来执行一次调度。
xxljob对调度过程做了两个优化:每次发出调度请求时,会新建一个线程,异步执行XxlJobTrigger的方法;在新建线程时,会根据执行XxlJobTrigger方法的耗时,选择不同的线程池;属性和start
初始化线程池
JobTriggerPoolHelper在toStart方法中初始化了它的两个线程池属性,代码如下:快速、慢速线程池,分别执行调度任务不一样的任务,实现隔离,避免相互阻塞privateThreadPoolExecutorfastTriggerPoolnull;privateThreadPoolExecutorslowTriggerPoolnull;publicvoidstart(){fastTriggerPoolnewThreadPoolExecutor(10,至少200XxlJobAdminConfig。getAdminConfig()。getTriggerPoolFastMax(),60L,TimeUnit。SECONDS,newLinkedBlockingQueue(1000),rnewThread(r,xxljob,adminJobTriggerPoolHelperfastTriggerPoolr。hashCode()));slowTriggerPoolnewThreadPoolExecutor(10,至少100XxlJobAdminConfig。getAdminConfig()。getTriggerPoolSlowMax(),60L,TimeUnit。SECONDS,newLinkedBlockingQueue(2000),rnewThread(r,xxljob,adminJobTriggerPoolHelperslowTriggerPoolr。hashCode()));}
每次有调度请求时,就会在这两个线程池中创建线程,创建线程的逻辑在addTrigger方法中。addTrigger
新建线程,调用XxlJobTrigger。trigger()方法
不同job存在执行时长的差异,为了避免不同耗时job之间相互阻塞,xxljob根据job的响应时间,对job进行了区分,主要体现在:如果job耗时短,就在fastTriggerPool线程池中创建线程;如果job耗时长且调用频繁,就在slowTriggerPool线程池中创建线程;
如果快job与调用频繁的慢job在同一个线程池中创建线程,慢job会占用大量的线程,导致快job线程不能及时运行,降低了线程池和线程的利用率。xxljob通过快慢隔离,避免了这个问题。
问题:如果快慢job使用同一个线程池时,慢job占用了线程,导致快job线程不能及时运行,正常情况下,我们的反应是增加线程池的线程数,这样做能否解决问题?
不能,因为慢job还是会占用大量线程,抢占了快job的线程资源;增加线程池中的线程数不但没有提升利用率,还会导致大量线程看空闲,利用率反而降低了。最好的方法还是用两个线程池把两者隔离,可以合理地使用各自线程池的资源。
为了记录慢job的超时次数,代码中使用一个map(变量jobTimeoutCountMap)来记录一分钟内job超时次数,key值是jobid,value是超时次数。在调用XxlJobTrigger。trigger()方法之前,会先判断map中,该jobid的超时次数是否大于10,如果大于10,就是使用slowTriggerPool,代码如下:属性变量privatevolatileConcurrentMapInteger,AtomicIntegerjobTimeoutCountMapnewConcurrentHashMap();选择线程池,如果在一分钟内调度超过10次,使用slowTriggerPoolThreadPoolExecutortriggerPoolfastTriggerPool;AtomicIntegerjobTimeoutCountjobTimeoutCountMap。get(jobId);if(jobTimeoutCount!nulljobTimeoutCount。get()10){triggerPoolslowTriggerPool;}
调用XxlJobTrigger。trigger()方法后,根据两个值来更新jobTimeoutCountMap的值:当前时间与上次调用是否在一分钟以内,如果不在一分钟以内,就清空map;本次XxlJobTrigger。trigger()的调用是否超过500毫秒,如果超过500毫秒,就在map中增加jobid的超时次数;
和上面的代码相结合,一个job在一分钟内有10次调用超过500毫秒,就认为该job是一个频繁调度且耗时的job。
代码如下:属性变量,初始值等于JobTriggerPoolHelper对象构造时的分钟数每次调用XxlJobTrigger。trigger()方法时,值等于上一次调用的分钟数privatevolatilelongminTimSystem。currentTimeMillis()60000;当前时间的分钟数,如果和前一次调用不在同一分钟内,就清空jobTimeoutCountMaplongminTimnowSystem。currentTimeMillis()60000;if(minTim!minTimnow){minTimminTimnow;jobTimeoutCountMap。clear();}开始调用XxlJobTrigger。trigger()的时间longstartSystem。currentTimeMillis();。。。调用XxlJobTrigger。trigger()方法XxlJobTrigger。trigger(jobId,triggerType,failRetryCount,executorShardingParam,executorParam,addressList);如果用时超过500毫秒,就增加一次它的慢调用次数longcostSystem。currentTimeMillis()start;if(cost500){AtomicIntegertimeoutCountjobTimeoutCountMap。putIfAbsent(jobId,newAtomicInteger(1));if(timeoutCount!null){timeoutCount。incrementAndGet();}}
XxlJobTrigger。trigger()方法在下面的XxlJobTrigger类中有详细介绍,这里只需要知道它会对一个job发起一次执行请求。
在该类中,属性变量minTim和jobTimeoutCountMap都使用volatile来修饰,保证了并发调用addTrigger时数据的一致性和可见性。
问题:为什么要每分钟清空一次map中的数据?
admin服务发起job调度请求时,是在静态方法publicstaticvoidtrigger()中调用静态变量privatestaticJobTriggerPoolHelperhelper的addTrigger方法来发起请求的。minTim和jobTimeoutCountMap虽然不是static修饰的,但可以看做是全局唯一的(因为持有它们的对象是全局唯一的),因此这两个参数维护的是admin服务全局的调度时间和超时次数,为了避免记录的数据量过大,需要每分钟清空一次数据的操作。JobRegistryHelper
executor注册和下线的辅助类
admin服务提供了接口给executor来注册和下线,另外,当executor长时间(90秒)没有发心跳时,要把executor自动下线。前一个功能通过暴露一个接口来接收请求,后一个功能需要开启一个线程,定时更新过期executor的状态。
xxljob为了提升admin服务的性能,在前一个功能的接口接收到executor的请求时,不是同步执行,而是在线程池中开启一个线程,异步执行executor的注册和下线请求。
JobRegistryHelper类就负责管理这个线程池和定时线程的。注册和下线
线程池的定义和初始化代码如下:注册或移除executor的线程池privateThreadPoolExecutorregistryOrRemoveThreadPoolnull;注册或移除线程池registryOrRemoveThreadPoolnewThreadPoolExecutor(2,10,30L,TimeUnit。SECONDS,newLinkedBlockingQueue(2000),rnewThread(r,xxljob,adminJobRegistryMonitorHelperregistryOrRemoveThreadPoolr。hashCode()),(r,executor){r。run();logger。warn(xxljob,registryorremovetoofast,matchthreadpoolrejectedhandler(runnow)。);});
线程池的核心线程数是2,最大线程数是10,允许一个2000的队列,如果executor实例很多,会导致注册延迟的。当然,一般不会把2000个executor注册到同一个admin服务。
executor实例在发起注册和下线请求时,会调用AdminBizImpl类的对应方法,该类的方法如下:
可以看到,AdminBizImpl类的两个方法都是调用了JobRegistryHelper方法来实现,其中JobRegistryHelper。registry方法代码如下(registryRemove代码与之相似):publicReturnTStringregistry(RegistryParamregistryParam){校验参数if(!StringUtils。hasText(registryParam。getRegistryGroup())!StringUtils。hasText(registryParam。getRegistryKey())!StringUtils。hasText(registryParam。getRegistryValue())){returnnewReturnT(ReturnT。FAILCODE,IllegalArgument。);}在线程池中创建线程registryOrRemoveThreadPool。execute((){updateintretXxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。registryUpdate(registryParam。getRegistryGroup(),registryParam。getRegistryKey(),registryParam。getRegistryValue(),newDate());update失败,insertif(ret1){XxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。registrySave(registryParam。getRegistryGroup(),registryParam。getRegistryKey(),registryParam。getRegistryValue(),newDate());刷新,空方法freshGroupRegistryInfo(registryParam);}});returnReturnT。SUCCESS;}
这两个方法是通过在线程池registryOrRemoveThreadPool中创建线程来异步执行请求,然后把数据更新或新建到数据表xxljobregistry中。更新和管理Jobgroup
当executor注册到admin服务后(数据入库到xxljobregistry表),是不会在页面上显示的,需要要用户手动添加jobgroup数据(添加到xxljobgroup表),admin服务会自动把用户添加的jobgroup数据与xxljobregistry数据关联。这就需要admin定时从xxljobgroup表读取数据,关联xxljobregistry表和xxljobgroup表的数据。
这个功能是与executor自动下线功能在同一个线程中实现,该线程的主要逻辑是:从xxljobgroup表查询出自动设置address的group列表,如果group列表不为空,才继续向下执行;从xxljobregistry表删除不再存活(90秒内都没有更新)的记录,避免无效记录影响后续操作;从xxljobregistry表取出存活的记录,根据appName设置xxljobgroup记录的addresslist值,多个address使用逗号拼接;sleep30秒,这个线程每30秒执行一次。
相关代码如下:注册监视器线程privateThreadregistryMonitorThread;停止标志位privatevolatilebooleantoStopfalse;自动注册的jobgroupListXxlJobGroupgroupListXxlJobAdminConfig。getAdminConfig()。getXxlJobGroupDao()。findByAddressType(0);删除已经下线(90秒内没有心跳)的注册adminexecutorListIntegeridsXxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。findDead(RegistryConfig。DEADTIMEOUT,newDate());if(ids!nullids。size()0){XxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。removeDead(ids);}刷线还在线(90秒内有心跳)的adminexecutor的地址mapListXxlJobRegistrylistXxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。findAll(RegistryConfig。DEADTIMEOUT,newDate());HashMapString,ListStringappAddressMapnewHashMap(list。size());略。。。每30秒执行一次TimeUnit。SECONDS。sleep(RegistryConfig。BEATTIMEOUT);
从这里可以看出,如果是对外接口(接收请求等)的功能,使用线程池和异步线程来实现;如果是一些自动任务,则是通过一个线程来定时执行。JobFailMonitorHelper
Job执行失败的监视线程辅助类
如果一个Job调度后,没有响应返回,需要定时重试。作为一种自动执行的任务,很显然可以像前面JobRegistryHelper一样,使用一个线程定时重试。
在这个类中,定义了一个监视线程,以每10秒一次的频率运行,对失败的job进行重试。如果job剩余的重试次数大于0,就会job进行重试,并把发送告警信息。线程的定义如下:监视器线程privateThreadmonitorThread;
这里需要关注的问题是:当admin服务是集群部署时(共用一个数据库),怎么避免一个job被多个实例多次重试?需要有一个分布式锁。加锁
在这个线程中,它利用数据库执行UPDATE语句时会加上互斥锁的特性,使用了基于数据库的分布式锁,代码如下所示:UPDATE语句给该条记录加互斥锁,如果能加上,说明没有其他线程在修改该记录,也说明该记录还没被修改过设置新值1,表示该记录已经被加锁了intlockRetXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateAlarmStatus(failLogId,0,1);if(lockRet1){continue;}解锁xlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateAlarmStatus(failLogId,1,newAlarmStatus);
在这个语句中,会把jobLog的状态设置为1,这是一个无效状态值,当其他线程通过有效状态值来搜索失败记录时,会略过该记录,这样该记录就不会被其他线程重试,达到的分布式锁的功能(这个锁是一个行锁)。或者说,1状态类似于java中的对象头的锁标志位,表明该记录已经被加锁了,其他线程会忽略该记录。
问题:这里的加锁解锁代码有什么问题?
在try代码块中加锁和解锁,如果加锁后重试时抛出异常,会导致该记录永远无法解锁。所以,应该在finnally块中执行解锁操作,或者使用redis给锁加一个过期时间来实现分布式锁。重试
从失败的日志中取出jobId,查询出对应的jobInfo数据,如果日志中的剩余重试次数大于0,就执行重试。代码如下:取出失败的日志和对应的jobXxlJobLoglogXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。load(failLogId);XxlJobInfoinfoXxlJobAdminConfig。getAdminConfig()。getXxlJobInfoDao()。loadById(log。getJobId());1、重试失败的job,更新日志的triggermsg字段值if(log。getExecutorFailRetryCount()0){调度任务调度JobTriggerPoolHelper。trigger(log。getJobId(),TriggerTypeEnum。RETRY,(log。getExecutorFailRetryCount()1),log。getExecutorShardingParam(),log。getExecutorParam(),null);StringretryMsg
spanstylecolor:F39C12;I18nUtil。getString(jobconftriggertyperetry)span
;log。setTriggerMsg(log。getTriggerMsg()retryMsg);更新logXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateTriggerInfo(log);}2、如果job不为空,说明存在失败的任务,发送告警消息告警状态:0默认、1锁定状态、1无需告警、2告警成功、3告警失败intnewAlarmStatus0;if(info!null){booleanalarmResultXxlJobAdminConfig。getAdminConfig()。getJobAlarmer()。alarm(info,log);newAlarmStatusalarmResult?2:3;}else{newAlarmStatus1;}3、更新jobLog的alarmstatus值XxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateAlarmStatus(failLogId,1,newAlarmStatus);
调度任务使用的就是前面介绍的JobTriggerPoolHelper。trigger方法,最后更新jobLog的alarmstatus值,有两个作用:释放分布式锁;日志记录的alarmstatus被设置为大于0的值,不会再被作为失败日志查询出来(findFailJobLogIds方法的查询条件之一是alarmstatus0),避免了在下一次线程执行时再被重试。JobCompleteHelper
Job完成线程的辅助类
这个类与JobRegistryHelper类似,都有一个线程池、一个线程,通过前面JobRegistryHelper的学习,可以大胆猜测:线程池用来创建线程,处理接收到的请求;线程用来执行执行一些定定时任务。
实际上,该类中线程池和线程的作用就是用来完成一个job。接收回调
当executor接收到admin的调度请求后,会异步执行job,并立刻返回一个回调。
admin接受到回调后,和前面的注册、下线一样,在线程池中创建线程来处理回调,主要是更新job和日志。接收回调请求的线程池privateThreadPoolExecutorcallbackThreadPoolnull;初始化线程池callbackThreadPoolnewThreadPoolExecutor(2,20,30L,TimeUnit。SECONDS,newLinkedBlockingQueue(3000),rnewThread(r,xxljob,adminJobLosedMonitorHelpercallbackThreadPoolr。hashCode()),(r,executor){r。run();logger。warn(xxljob,callbacktoofast,matchthreadpoolrejectedhandler(runnow)。);});
当有回调请求时,publiccallback方法(该方法被AdminBizImpl调用)会在线程池中创建一个线程,遍历回调请求的参数列表,依次处理回调参数,代码如下:在线程池中创建线程处理回调参数publicReturnTStringcallback(ListHandleCallbackParamcallbackParamList){callbackThreadPool。execute(newRunnable(){Overridepublicvoidrun(){for(HandleCallbackParamhandleCallbackParam:callbackParamList){ReturnTStringcallbackResultcallback(handleCallbackParam);。。。}}});returnReturnT。SUCCESS;}privateReturnTStringcallback(HandleCallbackParamhandleCallbackParam){validlogitemXxlJobLoglogXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。load(handleCallbackParam。getLogId());更新log数据,略。。。更新和完成jobXxlJobCompleter。updateHandleInfoAndFinish(log);returnReturnT。SUCCESS;}
callBack的调用顺序:JobApiControllerAdminBizImplpublicallbackprivatecallback。
从代码可以看出,最后调用XxlJobCompleter。updateHandleInfoAndFinish方法完成回调逻辑。更新Job
如果一个job较长时间前被调度,但是一直处于运行中且它所属的executor已经超过90秒没有心跳了,那么可以认为该job已经丢失了,需要把该job结束掉。这个就是线程monitorThread的主要功能。
monitorThread会以60秒一次的频率,从xxljoblog表中找出10分钟前调度、仍处于运行中状态、executor已经下线的job,然后调用XxlJobCompleter。updateHandleInfoAndFinish来更新handler的信息和结束job,代码如下:监视丢失job的线程privateThreadmonitorThread;任务结果丢失处理:调度记录停留在运行中状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记为失败DatelosedTimeDateUtil。addMinutes(newDate(),10);ListLonglosedJobIdsXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。findLostJobIds(losedTime);if(losedJobIds!nulllosedJobIds。size()0){for(LonglogId:losedJobIds){XxlJobLogjobLognewXxlJobLog();jobLog。setId(logId);jobLog。setHandleTime(newDate());设置handlercodejobLog。setHandleCode(ReturnT。FAILCODE);jobLog。setHandleMsg(I18nUtil。getString(jobloglostfail));更新执行信息和结束jobXxlJobCompleter。updateHandleInfoAndFinish(jobLog);}}
从代码可以看出,上面的两个功能最后都调用了XxlJobCompleter。updateHandleInfoAndFinish方法,关于该方法的介绍,可以看后面XxlJobCompleter部分的介绍,这里不详细展开。JobLogReportHelper
Job日志统计辅助类
如果去看XxlJobTrigger。triger方法,会发现每次调度job时,都会先新增一个jobLog记录,这也是为什么JobFailMonitorHelper中的线程在重试时,先查询jobLog的原因。
JobLog作为job的调度记录,还可以用来统计一段时间内job的调度次数、成功数等;另外,会清理超出有效期(配置的参数logretentiondays)的日志,避免日志数据过大。很显然,这又是一个自动任务,可以使用一个线程定时完成。
该类持有一个线程变量,线程以每分钟一次的频率,执行两个操作:统计一段时间的job数据,主要统计指标有:总的调度次数、处于调度运行中的次数、调度失败的次数、调度成功的次数;清理过期的日志数。
在线程run方法的前半部分,线程会统计3天内,每天的调度次数、运行次数、成功运行数、失败次数;然后更新或新增xxljoblogreport表的数据。清理日志
在线程run方法的后半部分,线程按天对日志进行清理,如果当前时间与上次清理的时间相隔超过一天,就会清理日志记录,代码如下:根据上次执行时间、配置的过期参数,来决定是否执行清理上次清理时间与当前超过1天才清理longlastCleanLogTime0;if(XxlJobAdminConfig。getAdminConfig()。getLogretentiondays()0System。currentTimeMillis()lastCleanLogTime2460601000){清理的开始时间。。。略开始清理日志ListLonglogIdsnull;do{logIdsXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。findClearLogIds(0,0,clearBeforeTime,0,1000);if(logIds!nulllogIds。size()0){XxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。clearLog(logIds);}}while(logIds!nulllogIds。size()0);更新执行清理操作的时间lastCleanLogTimeSystem。currentTimeMillis();}
问题:为什么要用lastCleanLogTime记录上一次的清理时间?每次执行时,不能直接清理一天前创建的数据吗?
如果不使用参数lastCleanLogTime来记录上次清理的时间,只是清理一天前创建的数据记录。那么该线程每分钟执行一次时,都会删除前天当前时刻的数据,导致前一年的数不完整。
使用参数lastCleanLogTime来记录上次清理的时间,并且与当前时间相差超过一天时才清理,能保证前一天的日志是完整的。
问题:为什么每次只删除1000条日志?
不明白为什么清理日志时,不是一次性删除全部的过期日志,而是每次删除1000条。按理说,这些旧的日志数据应该已经不在bufferpool中了,triggertime字段又是普通索引,那么DELETE操作会先更新到changebuffer中,之后再合并。现在先查询再删除,相当于多了一次IO且没有使用到changebuffer。JobScheduleHelper
Job调度辅助类
admin服务是用来管理和调度job的,用户也可以在它的管理后台新建一个job,配置CRON和JobHandler,然后admin服务就会按照配置的参数来调度job。很显然,这种自动化工作也是由线程定时执行的。
1、如果使用线程调度Job,存在的第一个问题是:如果某个Job在调度时比较耗时,就可能阻塞后续的Job,导致后续job的执行有延迟,怎么解决这个问题?
在前面JobTriggerPoolHelper我们已经知道,admin在调度job时是使用线程池、线程异步执行调度任务,避免了主线程的阻塞。
2、使用线程定时调度job,存在的第二个问题是:怎么保证job在指定的时间执行,而不会出现大量延迟?
admin使用预读的方式,提前读取在未来一段时间内要执行的job,提前取到内存中,并使用时间轮算法按时间分组job,把未来要执行的job下一个时间段执行。
3、还隐藏第三个问题:admin服务是可以多实例部署的,在这种情况下该怎么避免一个job被多个实例重复调度?
admin把一张数据表作为分布式锁来保证只有一个admin实例能执行job调度,又通过随机sleep线程一段时间,来降低线程之间的竞争。
下面我们就通过代码来了解xxljob是怎么解决上述问题的。调度线程
在该类中,定义了一个调度线程,用来调度要执行的job和已经过期一段时间的job,定义代码如下:预读的毫秒数publicstaticfinallongPREREADMS5000;预读和调度过期任务的线程privateThreadscheduleThread;预读
下面代码中的pushTimeRing,是把job添加到一个map对象ringData中,然后让另一个线程从该map对象中取出,再次调度
该线程会预读出下次执行时间now5000毫秒内的部分job,根据它们下一次执行时间划分成三段,执行三种不同的逻辑。
1、下次执行时间在(,now5000)范围内
说明过期时间已经大于5000毫秒,这时如果过期策略要求调度,就调度一次。代码如下:if(nowTimejobInfo。getTriggerNextTime()PREREADMS){logger。warn(xxljob,schedulemisfire,jobIdjobInfo。getId());MisfireStrategyEnummisfireStrategyEnumMisfireStrategyEnum。match(jobInfo。getMisfireStrategy(),MisfireStrategyEnum。DONOTHING);if(MisfireStrategyEnum。FIREONCENOWmisfireStrategyEnum){调度一次JobTriggerPoolHelper。trigger(jobInfo。getId(),TriggerTypeEnum。MISFIRE,1,null,null,null);logger。debug(xxljob,schedulepushtrigger:jobIdjobInfo。getId());}更新下一次执行时间refreshNextValidTime(jobInfo,newDate());}
2、下次执行时间在〔now5000,now)范围内
说明过期时间小于5000毫秒,只能算是延迟不能算是过期,直接调度一次,代码如下:if(nowTimejobInfo。getTriggerNextTime()){1、调度一次JobTriggerPoolHelper。trigger(jobInfo。getId(),TriggerTypeEnum。CRON,1,null,null,null);logger。debug(xxljob,schedulepushtrigger:jobIdjobInfo。getId());2、更新下一次调度时间refreshNextValidTime(jobInfo,newDate());3、如果当前job处于可以被调度的状态,且下一次执行时间在5000毫秒内,就记录下jobId,等待后面轮询if(jobInfo。getTriggerStatus()1nowTimePREREADMSjobInfo。getTriggerNextTime()){下次调度的时刻:秒intringSecond(int)((jobInfo。getTriggerNextTime()1000)60);保存进ringData中pushTimeRing(ringSecond,jobInfo。getId());刷新下一次的调度时间refreshNextValidTime(jobInfo,newDate(jobInfo。getTriggerNextTime()));}}
如果job的下一次执行时间在5000毫秒以内,为了省下下次预读的IO耗时,这里会记录下jobid,等待后面的调度。
3、下次执行时间在〔now,now5000)范围内
说明还没到执行时间,先记录下jobid,等待后面的调度,代码如下:intringSecond(int)((jobInfo。getTriggerNextTime()1000)60);pushTimeRing(ringSecond,jobInfo。getId());refreshNextValidTime(jobInfo,newDate(jobInfo。getTriggerNextTime()));
上面的3个步骤结束后,会更新jobInfo的triggerlasttime、triggernexttime、triggerstatus字段:更新job数据for(XxlJobInfojobInfo:scheduleList){XxlJobAdminConfig。getAdminConfig()。getXxlJobInfoDao()。scheduleUpdate(jobInfo);}
可以看到,通过预读,一方面会把过期一小段时间的job执行一遍,另一方面会把未来一小段时间内要执行的job取出,保存进一个map对象ringData中,等待另一个线程调度。这样就避免了某些job到了时间还没执行。分布式锁
因为admin是可以多实例部署的,所以在调度job时,需要考虑怎么避免job被多次调度。
xxljob在前面JobFailMonitorHelper中遍历失败的job时,会对每个job设置一个无效的状态作为分布式行锁,如果设置失败就跳过。而在这里,如果还使用该方法,有可能出现,一个job被设置为无效状态后,线程就崩溃了,导致该job永远无法被调度。因此,要尽量避免对job状态的修改。
在这里,admin服务使用一张表xxljoblock作为分布式锁,每个admin实例都要先尝试获取该表的锁,获取成功才能继续执行;同时,为了降低不同实例之间的竞争,会在线程开始执勤随机sleep一段时间。
如何获取分布式锁?
在线程中会开启一个事务,设置为手动提交,然后对表xxljoblock执行FORUPDATE查询。如果该线程执行语句成功,其他实例的线程就会排队等待该表的锁,实现了分布式锁功能。代码如下:获取数据库链接,通过SELECTFORUPDATE来尝试获取X锁在事务提交前一直持有该锁,其他实例的线程想获取该锁就会失败,并且会排队等待,直到第一个事务提交释放或锁超时connXxlJobAdminConfig。getAdminConfig()。getDataSource()。getConnection();connAutoCommitconn。getAutoCommit();conn。setAutoCommit(false);preparedStatementconn。prepareStatement(selectfromxxljoblockwherelocknameschedulelockforupdate);preparedStatement。execute();提交事务,释放X锁if(conn!null){try{conn。commit();}catch(SQLExceptione){。。。}try{conn。setAutoCommit(connAutoCommit);}catch(SQLExceptione){。。。}try{conn。close();}catch(SQLExceptione){。。。}}
怎么降低锁的竞争?
为了降低锁竞争,在线程开始前会先sleep40005000毫秒的随机值(不能大于5000毫秒,5000毫秒是预读的时间范围);在线程结束当前循环时,会根据耗时和是否有预读数据,选择不同的sleep策略:耗时超过1000毫秒,不sleep,直接开始下一次循环;耗时小于1000毫秒,根据是否有预读数据,sleep一个大小不同的随机时长:有预读数据,sleep时间短一些,在01000毫秒范围内;没有预读数据,sleep时间长一些,在04000毫秒范围内;
代码如下:try{随机sleep40005000毫秒,通过这种方式,降低多实例部署时对锁的竞争这里也看出来,最多部署5000台实例。。TimeUnit。MILLISECONDS。sleep(5000System。currentTimeMillis()1000);}catch(InterruptedExceptione){if(!scheduleThreadToStop){logger。error(e。getMessage(),e);}}耗时超过1000毫秒,就不sleep不超过1000ms,就sleep一个随机时长longcostSystem。currentTimeMillis()start;if(cost1000){try{没有预读数据,就sleep时间长一点;有预读数据,就sleep时间短一些TimeUnit。MILLISECONDS。sleep((preReadSuc?1000:PREREADMS)System。currentTimeMillis()1000);}catch(InterruptedExceptione){if(!scheduleThreadToStop){logger。error(e。getMessage(),e);}}}ringThread:时间轮
在前面的线程中,对即将要开始的job,不是立刻调度,而是按照执行的时刻(秒),把jobid保存进一个map中,然后由ringThread线程按时刻进行调度,这只典型的时间轮算法。代码如下:调度线程2privateThreadringThread;按时刻(秒)调度jobprivatevolatilestaticMapInteger,ListIntegerringDatanewConcurrentHashMap();调度过程ListIntegerringItemDatanewArrayList();每次取出2个时刻的job来调度intnowSecondCalendar。getInstance()。get(Calendar。SECOND);for(inti0;i2;i){ListIntegertmpDataringData。remove((nowSecond60i)60);if(tmpData!null){ringItemData。addAll(tmpData);}}遍历jobId,执行调度if(ringItemData。size()0){for(intjobId:ringItemData){JobTriggerPoolHelper。trigger(jobId,TriggerTypeEnum。CRON,1,null,null,null);}ringItemData。clear();}
每次轮询调度时,只取出当前时刻(秒)、前一秒内的job,不会去调度与现在相隔太久的job。
在执行轮询调度前,有一个时间在01000毫秒范围内的sleep。如果没有这个sleep,该线程会一直执行,而ringData中当前时刻(秒)的数据可能已经为空,会导致大量无效的操作;增加了这个sleep之后,可以避免这种无效的操作。之所以sleep时间在1000毫秒以内,是因为调度时刻最小精确到秒,一秒的sleep可以避免job的延迟。try{TimeUnit。MILLISECONDS。sleep(1000System。currentTimeMillis()1000);}catch(InterruptedExceptione){if(!ringThreadToStop){logger。error(e。getMessage(),e);}}
问题:为什么在时间轮调度时,没有加分布式锁?
因为在前面的scheduleThread线程中,最后一个操作是把job的nexttriggertime值更新为大于now5000毫秒,其他admin实例scheduleThread线程的查询条件是:nexttriggertimenow5000,不会查询出这里调度的job,所以不需要加分布式锁。
至此,XxlJobSchedulerinit方法的作用我们介绍完毕,下面我们简单介绍一下XxlJobSchedulerdestroy方法XxlJobSchedulerdestroy
destroy方法很简单,就是销毁前面初始化的线程池和线程,它销毁的顺序与前面启动的顺序相反。
代码如下:销毁,销毁过程与init顺序相反publicvoiddestroy()throwsException{1、销毁调度线程JobScheduleHelper。getInstance()。toStop();2、销毁日志统计和清理线程JobLogReportHelper。getInstance()。toStop();3、销毁丢失job监视器线程JobCompleteHelper。getInstance()。toStop();4、销毁失败job监视器线程JobFailMonitorHelper。getInstance()。toStop();adminregistrystopJobRegistryHelper。getInstance()。toStop();admintriggerpoolstopJobTriggerPoolHelper。toStop();}
因为各个toStop方法都很相似,所以我们只介绍JobScheduleHelper的toStop方法。
该方法的步骤如下:
1、设置停止标志位为true;
2、sleep一段时间,让出CPU时间片给线程执行任务;
3、如果线程不是终止状态(线程正在sleep),中断它;
4、线程执行join方法,直到线程结束,执行最后一次。
代码如下:scheduleThreadToStoptrue;给线程1s的时间去执行任务try{TimeUnit。SECONDS。sleep(1);}catch(InterruptedExceptione){logger。error(e。getMessage(),e);}如果线程不是终止状态,就让它执行完所有任务if(scheduleThread。getState()!Thread。State。TERMINATED){scheduleThread。interrupt();try{scheduleThread。join();}catch(InterruptedExceptione){logger。error(e。getMessage(),e);}}
至此,JobScheduleHelper的主要功能就介绍完了,可以看出,admin服务在启动时,启动了多个线程池和线程,异步执行任务和异步响应executor的请求。
下面,我们介绍前面涉及到的XxlJobTrigger和XxlJobCompleter。XxlJobTrigger
调度job时的封装类
XxlJobTrigger是调度job时的封装类,它主要工作就是接受传入的jobId、调度参数等,查询对应的jobGroup、jobInfo,然后调用ExecutorBiz对象来执行调度(run方法)。
注意:这个类本身不会执行http请求,http请求是在core包下的工具类XxlJobRemotingUtil中执行的。
该类中三个核心方法及其调用关系如下:triggerprocessTriggerrunExecutor,trigger
该方法的功能比较简单,就是根据传入的参数查询jobGroup和jobInfo对象,设置相关的字段值,然后调用processTrigger方法。processTrigger
该方法的主要工作分为以下几步:
1、保存一条调度日志;
2、从jobInfo、jobGroup中取出字段值,构造TriggerParam对象;
3、根据jobInfo的路由策略,从jobGroup中取出要调度的executor地址;
4、调用runExecutor方法执行调度;
5、保存调度参数、设置调度信息、更新日志。
这里不会修改jobInfo、jobGroup对象的字段值,只取出字段值来使用,对这两个对象字段的修改,是在前一步trigger方法中进行的。runExecutor
该方法会执行调度,并返回调度结果,它的核心代码如下:ExecutorBizexecutorBizXxlJobScheduler。getExecutorBiz(address);runResultexecutorBiz。run(triggerParam);
这里使用XxlJobScheduler类取出ExecutorBiz对象,以懒加载的方式给每个address创建一个ExecutorBiz对象,代码如下:privatestaticConcurrentMapString,ExecutorBizexecutorBizRepositorynewConcurrentHashMapString,ExecutorBiz();publicstaticExecutorBizgetExecutorBiz(Stringaddress)throwsException{validif(addressnulladdress。trim()。length()0){returnnull;}loadcacheaddressaddress。trim();ExecutorBizexecutorBizexecutorBizRepository。get(address);if(executorBiz!null){returnexecutorBiz;}setcacheexecutorBiznewExecutorBizClient(address,XxlJobAdminConfig。getAdminConfig()。getAccessToken());executorBizRepository。put(address,executorBiz);returnexecutorBiz;}
吐槽一句:这个功能完全可以放在XxlJobTrigger类中、或者封装在ExecutorBiz内部,不知道为什么要放在XxlJobScheduler,平白无故多了一层调用。
可以看出,该类中的三个方法其实可以归类为:preexecutepost,在执行前、执行时、执行后做一些前置和收尾工作。XxlJobCompleter
job的完成类
该类在前面JobCompleteHelper中被使用,最终job的完成就是在该类中执行的,该类有两个主要方法:updateHandleInfoAndFinish:公共方法,调用finishJob方法和更新日志;finishJob:私有方法,执行子任务和更新日志;
下面主要介绍finishJob方法。finishJob
finishJob的主要功能是:如果当前任务执行成功了,就调度它的所有子任务,最后把子任务的调度消息添加到当前job的日志中。代码如下:privatestaticvoidfinishJob(XxlJobLogxxlJobLog){1、job执行成功,开始调度子任务handlesuccess,totriggerchildjobStringBuildertriggerChildMsgnull;if(XxlJobContext。HANDLECODESUCCESSxxlJobLog。getHandleCode()){XxlJobInfoxxlJobInfoXxlJobAdminConfig。getAdminConfig()。getXxlJobInfoDao()。loadById(xxlJobLog。getJobId());if(xxlJobInfo!nullxxlJobInfo。getChildJobId()!nullxxlJobInfo。getChildJobId()。trim()。length()0){2、遍历子任务IDString〔〕childJobIdsxxlJobInfo。getChildJobId()。split(,);for(inti0;ichildJobIds。length;i){intchildJobId(childJobIds〔i〕!nullchildJobIds〔i〕。trim()。length()0isNumeric(childJobIds〔i〕))?Integer。parseInt(childJobIds〔i〕):1;if(childJobId0){3、调度子任务JobTriggerPoolHelper。trigger(childJobId,TriggerTypeEnum。PARENT,1,null,null,null);ReturnTStringtriggerChildResultReturnT。SUCCESS;4、添加日志信息。。。。略}}}5、保存子任务的调度消息到日志if(triggerChildMsg!null){xxlJobLog。setHandleMsg(xxlJobLog。getHandleMsg()triggerChildMsg);}}
需要注意的是:
1、这里依赖于JobTriggerPoolHelper来调度job,所以在JobCompleteHelper的监视线程开始时,有一个50秒的等待,就是等待JobTriggerPoolHelper启动完成;
2、在finishJob方法中,调度子任务的时候,默认子任务的调度结果是成功,注意,这里是指调度这个行为是成功的,而不是指子任务执行是成功的。总结
1、XxlJobAdminConfig作为admin服务的启动入口,要尽可能保持简洁,作用类似于一个仓库,来管理和持有所有的类和对象,并不会去启动具体的线程,它只需要按下启动器的按钮就可以了;
2、XxlJobScheduler是admin服务的启动器类,它会调用各个辅助类(xxxHelper)来启动对应的线程;
3、对外的接口,比如调度job、接收注册或下线等,都是使用线程池线程的异步方式实现,避免job对主线程的阻塞;
4、对自动任务类的功能,都是使用线程定时执行;参考阅读
XXLJOB分布式调度框架全面详解:https:juejin。cnpost6948397386926391333
时间轮算法:https:spongecaptain。coolpostwidgettimingwheel
一个开源的时间轮算法介绍:https:spongecaptain。coolpostwidgettimingwheel2