游戏电视苹果数码历史美丽
投稿投诉
美丽时装
彩妆资讯
历史明星
乐活安卓
数码常识
驾车健康
苹果问答
网络发型
电视车载
室内电影
游戏科学
音乐整形

XXLadmin源码解析

  xxljob的admin服务是xxljob的调度中心,负责管理和调度注册的job,关于xxljob的使用,可以阅读参考阅读中的《XXLJOB分布式调度框架全面详解》,这里主要是介绍admin中的源码。
  admin服务除了管理页面上的一些接口外,还有一些核心功能,比如:
  1、根据job的配置,自动调度job;
  2、接收executor实例的请求,实现注册和下线;
  3、监视失败的job,进行重试;
  4、结束一些异常的job;
  5、清理和统计日志;
  这些功能都是在admin服务启动后,在后台自动运行的,下面将详细介绍admin服务这些功能的实现。XxlJobAdminConfig
  admin的配置类
  XxlJobAdminConfig是admin服务的配置类,在admin服务启动时,它除了配置admin服务的一些参数外,还会启动admin服务的所有后台线程。属性
  该类的属性主要分为5类:
  1、配置文件中的参数,比如accessToken;
  2、DAO层各个数据表的mapper;
  3、Spring容器中的一些Bean,比如JobAlarmer、DataSource等;
  4、私有变量XxlJobScheduler对象;privateXxlJobSchedulerxxlJobScheduler;
  5、私有静态变量adminConfig,指向实例自身。privatestaticXxlJobAdminConfigadminConfignull;publicstaticXxlJobAdminConfiggetAdminConfig(){returnadminConfig;}方法
  该类有两个重要方法,分别实现自接口InitializingBean、DisposableBean,作用如下:afterPropertiesSet方法,在Spring容器中Bean初始化完成之后,在该方法中进行初始化;destroy方法,在容器销毁Bean时,会执行销毁操作;
  这两个方法分别调用了XxlJobScheduler对象的init、destroy方法,源码如下:OverridepublicvoidafterPropertiesSet()throwsException{adminConfigthis;xxlJobSchedulernewXxlJobScheduler();xxlJobScheduler。init();}Overridepublicvoiddestroy()throwsException{xxlJobScheduler。destroy();}
  XxlJobAdminConfig作为admin服务的配置类,作用就是在Spring容器启动时,调用XxlJobScheduler的初始化方法,来初始化和启动admin服务的功能。XxlJobScheduler
  XxlJobScheduler的作用就是调用各个辅助类(xxxHelper)来启动和结束不同的线程和功能,初始化方法init的代码如下:
  如果把XxlJobScheduler看做是一个启动器,那么init方法就是启动按钮,XxlJobAdminConfig的作用就是按下这个按钮。publicvoidinit()throwsException{0、初始化国际化消息,不是很重要忽略initI18n();1、初始化调度器的线程池JobTriggerPoolHelper。toStart();2、启动注册监视器线程JobRegistryHelper。getInstance()。start();3、启动失败job监视器线程,查询失败日志进行重试JobFailMonitorHelper。getInstance()。start();4、启动丢失job监视器线程,一些job发出调度指令后,一直没有响应,状态一直是运行中JobCompleteHelper。getInstance()。start();5、启动日志统计和清理线程JobLogReportHelper。getInstance()。start();6、启动调度线程,定时调度jobJobScheduleHelper。getInstance()。start();logger。info(initxxljobadminsuccess。);}
  下面我们主要介绍init中各个类及其作用,最后再简单一下介绍destroy的作用。JobTriggerPoolHelper
  Trigger线程池的辅助类:管理Trigger线程池、添加trigger线程到线程池
  当admin服务向executor实例发出一个调度请求来执行job时,会调用XxlJobTrigger。trigger()方法把要传输的参数(比如jobid、jobHandler、joblogid、阻塞策略等,包装成TriggerParam对象)传给ExecutorBiz对象来执行一次调度。
  xxljob对调度过程做了两个优化:每次发出调度请求时,会新建一个线程,异步执行XxlJobTrigger的方法;在新建线程时,会根据执行XxlJobTrigger方法的耗时,选择不同的线程池;属性和start
  初始化线程池
  JobTriggerPoolHelper在toStart方法中初始化了它的两个线程池属性,代码如下:快速、慢速线程池,分别执行调度任务不一样的任务,实现隔离,避免相互阻塞privateThreadPoolExecutorfastTriggerPoolnull;privateThreadPoolExecutorslowTriggerPoolnull;publicvoidstart(){fastTriggerPoolnewThreadPoolExecutor(10,至少200XxlJobAdminConfig。getAdminConfig()。getTriggerPoolFastMax(),60L,TimeUnit。SECONDS,newLinkedBlockingQueue(1000),rnewThread(r,xxljob,adminJobTriggerPoolHelperfastTriggerPoolr。hashCode()));slowTriggerPoolnewThreadPoolExecutor(10,至少100XxlJobAdminConfig。getAdminConfig()。getTriggerPoolSlowMax(),60L,TimeUnit。SECONDS,newLinkedBlockingQueue(2000),rnewThread(r,xxljob,adminJobTriggerPoolHelperslowTriggerPoolr。hashCode()));}
  每次有调度请求时,就会在这两个线程池中创建线程,创建线程的逻辑在addTrigger方法中。addTrigger
  新建线程,调用XxlJobTrigger。trigger()方法
  不同job存在执行时长的差异,为了避免不同耗时job之间相互阻塞,xxljob根据job的响应时间,对job进行了区分,主要体现在:如果job耗时短,就在fastTriggerPool线程池中创建线程;如果job耗时长且调用频繁,就在slowTriggerPool线程池中创建线程;
  如果快job与调用频繁的慢job在同一个线程池中创建线程,慢job会占用大量的线程,导致快job线程不能及时运行,降低了线程池和线程的利用率。xxljob通过快慢隔离,避免了这个问题。
  问题:如果快慢job使用同一个线程池时,慢job占用了线程,导致快job线程不能及时运行,正常情况下,我们的反应是增加线程池的线程数,这样做能否解决问题?
  不能,因为慢job还是会占用大量线程,抢占了快job的线程资源;增加线程池中的线程数不但没有提升利用率,还会导致大量线程看空闲,利用率反而降低了。最好的方法还是用两个线程池把两者隔离,可以合理地使用各自线程池的资源。
  为了记录慢job的超时次数,代码中使用一个map(变量jobTimeoutCountMap)来记录一分钟内job超时次数,key值是jobid,value是超时次数。在调用XxlJobTrigger。trigger()方法之前,会先判断map中,该jobid的超时次数是否大于10,如果大于10,就是使用slowTriggerPool,代码如下:属性变量privatevolatileConcurrentMapInteger,AtomicIntegerjobTimeoutCountMapnewConcurrentHashMap();选择线程池,如果在一分钟内调度超过10次,使用slowTriggerPoolThreadPoolExecutortriggerPoolfastTriggerPool;AtomicIntegerjobTimeoutCountjobTimeoutCountMap。get(jobId);if(jobTimeoutCount!nulljobTimeoutCount。get()10){triggerPoolslowTriggerPool;}
  调用XxlJobTrigger。trigger()方法后,根据两个值来更新jobTimeoutCountMap的值:当前时间与上次调用是否在一分钟以内,如果不在一分钟以内,就清空map;本次XxlJobTrigger。trigger()的调用是否超过500毫秒,如果超过500毫秒,就在map中增加jobid的超时次数;
  和上面的代码相结合,一个job在一分钟内有10次调用超过500毫秒,就认为该job是一个频繁调度且耗时的job。
  代码如下:属性变量,初始值等于JobTriggerPoolHelper对象构造时的分钟数每次调用XxlJobTrigger。trigger()方法时,值等于上一次调用的分钟数privatevolatilelongminTimSystem。currentTimeMillis()60000;当前时间的分钟数,如果和前一次调用不在同一分钟内,就清空jobTimeoutCountMaplongminTimnowSystem。currentTimeMillis()60000;if(minTim!minTimnow){minTimminTimnow;jobTimeoutCountMap。clear();}开始调用XxlJobTrigger。trigger()的时间longstartSystem。currentTimeMillis();。。。调用XxlJobTrigger。trigger()方法XxlJobTrigger。trigger(jobId,triggerType,failRetryCount,executorShardingParam,executorParam,addressList);如果用时超过500毫秒,就增加一次它的慢调用次数longcostSystem。currentTimeMillis()start;if(cost500){AtomicIntegertimeoutCountjobTimeoutCountMap。putIfAbsent(jobId,newAtomicInteger(1));if(timeoutCount!null){timeoutCount。incrementAndGet();}}
  XxlJobTrigger。trigger()方法在下面的XxlJobTrigger类中有详细介绍,这里只需要知道它会对一个job发起一次执行请求。
  在该类中,属性变量minTim和jobTimeoutCountMap都使用volatile来修饰,保证了并发调用addTrigger时数据的一致性和可见性。
  问题:为什么要每分钟清空一次map中的数据?
  admin服务发起job调度请求时,是在静态方法publicstaticvoidtrigger()中调用静态变量privatestaticJobTriggerPoolHelperhelper的addTrigger方法来发起请求的。minTim和jobTimeoutCountMap虽然不是static修饰的,但可以看做是全局唯一的(因为持有它们的对象是全局唯一的),因此这两个参数维护的是admin服务全局的调度时间和超时次数,为了避免记录的数据量过大,需要每分钟清空一次数据的操作。JobRegistryHelper
  executor注册和下线的辅助类
  admin服务提供了接口给executor来注册和下线,另外,当executor长时间(90秒)没有发心跳时,要把executor自动下线。前一个功能通过暴露一个接口来接收请求,后一个功能需要开启一个线程,定时更新过期executor的状态。
  xxljob为了提升admin服务的性能,在前一个功能的接口接收到executor的请求时,不是同步执行,而是在线程池中开启一个线程,异步执行executor的注册和下线请求。
  JobRegistryHelper类就负责管理这个线程池和定时线程的。注册和下线
  线程池的定义和初始化代码如下:注册或移除executor的线程池privateThreadPoolExecutorregistryOrRemoveThreadPoolnull;注册或移除线程池registryOrRemoveThreadPoolnewThreadPoolExecutor(2,10,30L,TimeUnit。SECONDS,newLinkedBlockingQueue(2000),rnewThread(r,xxljob,adminJobRegistryMonitorHelperregistryOrRemoveThreadPoolr。hashCode()),(r,executor){r。run();logger。warn(xxljob,registryorremovetoofast,matchthreadpoolrejectedhandler(runnow)。);});
  线程池的核心线程数是2,最大线程数是10,允许一个2000的队列,如果executor实例很多,会导致注册延迟的。当然,一般不会把2000个executor注册到同一个admin服务。
  executor实例在发起注册和下线请求时,会调用AdminBizImpl类的对应方法,该类的方法如下:
  可以看到,AdminBizImpl类的两个方法都是调用了JobRegistryHelper方法来实现,其中JobRegistryHelper。registry方法代码如下(registryRemove代码与之相似):publicReturnTStringregistry(RegistryParamregistryParam){校验参数if(!StringUtils。hasText(registryParam。getRegistryGroup())!StringUtils。hasText(registryParam。getRegistryKey())!StringUtils。hasText(registryParam。getRegistryValue())){returnnewReturnT(ReturnT。FAILCODE,IllegalArgument。);}在线程池中创建线程registryOrRemoveThreadPool。execute((){updateintretXxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。registryUpdate(registryParam。getRegistryGroup(),registryParam。getRegistryKey(),registryParam。getRegistryValue(),newDate());update失败,insertif(ret1){XxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。registrySave(registryParam。getRegistryGroup(),registryParam。getRegistryKey(),registryParam。getRegistryValue(),newDate());刷新,空方法freshGroupRegistryInfo(registryParam);}});returnReturnT。SUCCESS;}
  这两个方法是通过在线程池registryOrRemoveThreadPool中创建线程来异步执行请求,然后把数据更新或新建到数据表xxljobregistry中。更新和管理Jobgroup
  当executor注册到admin服务后(数据入库到xxljobregistry表),是不会在页面上显示的,需要要用户手动添加jobgroup数据(添加到xxljobgroup表),admin服务会自动把用户添加的jobgroup数据与xxljobregistry数据关联。这就需要admin定时从xxljobgroup表读取数据,关联xxljobregistry表和xxljobgroup表的数据。
  这个功能是与executor自动下线功能在同一个线程中实现,该线程的主要逻辑是:从xxljobgroup表查询出自动设置address的group列表,如果group列表不为空,才继续向下执行;从xxljobregistry表删除不再存活(90秒内都没有更新)的记录,避免无效记录影响后续操作;从xxljobregistry表取出存活的记录,根据appName设置xxljobgroup记录的addresslist值,多个address使用逗号拼接;sleep30秒,这个线程每30秒执行一次。
  相关代码如下:注册监视器线程privateThreadregistryMonitorThread;停止标志位privatevolatilebooleantoStopfalse;自动注册的jobgroupListXxlJobGroupgroupListXxlJobAdminConfig。getAdminConfig()。getXxlJobGroupDao()。findByAddressType(0);删除已经下线(90秒内没有心跳)的注册adminexecutorListIntegeridsXxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。findDead(RegistryConfig。DEADTIMEOUT,newDate());if(ids!nullids。size()0){XxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。removeDead(ids);}刷线还在线(90秒内有心跳)的adminexecutor的地址mapListXxlJobRegistrylistXxlJobAdminConfig。getAdminConfig()。getXxlJobRegistryDao()。findAll(RegistryConfig。DEADTIMEOUT,newDate());HashMapString,ListStringappAddressMapnewHashMap(list。size());略。。。每30秒执行一次TimeUnit。SECONDS。sleep(RegistryConfig。BEATTIMEOUT);
  从这里可以看出,如果是对外接口(接收请求等)的功能,使用线程池和异步线程来实现;如果是一些自动任务,则是通过一个线程来定时执行。JobFailMonitorHelper
  Job执行失败的监视线程辅助类
  如果一个Job调度后,没有响应返回,需要定时重试。作为一种自动执行的任务,很显然可以像前面JobRegistryHelper一样,使用一个线程定时重试。
  在这个类中,定义了一个监视线程,以每10秒一次的频率运行,对失败的job进行重试。如果job剩余的重试次数大于0,就会job进行重试,并把发送告警信息。线程的定义如下:监视器线程privateThreadmonitorThread;
  这里需要关注的问题是:当admin服务是集群部署时(共用一个数据库),怎么避免一个job被多个实例多次重试?需要有一个分布式锁。加锁
  在这个线程中,它利用数据库执行UPDATE语句时会加上互斥锁的特性,使用了基于数据库的分布式锁,代码如下所示:UPDATE语句给该条记录加互斥锁,如果能加上,说明没有其他线程在修改该记录,也说明该记录还没被修改过设置新值1,表示该记录已经被加锁了intlockRetXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateAlarmStatus(failLogId,0,1);if(lockRet1){continue;}解锁xlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateAlarmStatus(failLogId,1,newAlarmStatus);
  在这个语句中,会把jobLog的状态设置为1,这是一个无效状态值,当其他线程通过有效状态值来搜索失败记录时,会略过该记录,这样该记录就不会被其他线程重试,达到的分布式锁的功能(这个锁是一个行锁)。或者说,1状态类似于java中的对象头的锁标志位,表明该记录已经被加锁了,其他线程会忽略该记录。
  问题:这里的加锁解锁代码有什么问题?
  在try代码块中加锁和解锁,如果加锁后重试时抛出异常,会导致该记录永远无法解锁。所以,应该在finnally块中执行解锁操作,或者使用redis给锁加一个过期时间来实现分布式锁。重试
  从失败的日志中取出jobId,查询出对应的jobInfo数据,如果日志中的剩余重试次数大于0,就执行重试。代码如下:取出失败的日志和对应的jobXxlJobLoglogXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。load(failLogId);XxlJobInfoinfoXxlJobAdminConfig。getAdminConfig()。getXxlJobInfoDao()。loadById(log。getJobId());1、重试失败的job,更新日志的triggermsg字段值if(log。getExecutorFailRetryCount()0){调度任务调度JobTriggerPoolHelper。trigger(log。getJobId(),TriggerTypeEnum。RETRY,(log。getExecutorFailRetryCount()1),log。getExecutorShardingParam(),log。getExecutorParam(),null);StringretryMsg
  spanstylecolor:F39C12;I18nUtil。getString(jobconftriggertyperetry)span
  ;log。setTriggerMsg(log。getTriggerMsg()retryMsg);更新logXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateTriggerInfo(log);}2、如果job不为空,说明存在失败的任务,发送告警消息告警状态:0默认、1锁定状态、1无需告警、2告警成功、3告警失败intnewAlarmStatus0;if(info!null){booleanalarmResultXxlJobAdminConfig。getAdminConfig()。getJobAlarmer()。alarm(info,log);newAlarmStatusalarmResult?2:3;}else{newAlarmStatus1;}3、更新jobLog的alarmstatus值XxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。updateAlarmStatus(failLogId,1,newAlarmStatus);
  调度任务使用的就是前面介绍的JobTriggerPoolHelper。trigger方法,最后更新jobLog的alarmstatus值,有两个作用:释放分布式锁;日志记录的alarmstatus被设置为大于0的值,不会再被作为失败日志查询出来(findFailJobLogIds方法的查询条件之一是alarmstatus0),避免了在下一次线程执行时再被重试。JobCompleteHelper
  Job完成线程的辅助类
  这个类与JobRegistryHelper类似,都有一个线程池、一个线程,通过前面JobRegistryHelper的学习,可以大胆猜测:线程池用来创建线程,处理接收到的请求;线程用来执行执行一些定定时任务。
  实际上,该类中线程池和线程的作用就是用来完成一个job。接收回调
  当executor接收到admin的调度请求后,会异步执行job,并立刻返回一个回调。
  admin接受到回调后,和前面的注册、下线一样,在线程池中创建线程来处理回调,主要是更新job和日志。接收回调请求的线程池privateThreadPoolExecutorcallbackThreadPoolnull;初始化线程池callbackThreadPoolnewThreadPoolExecutor(2,20,30L,TimeUnit。SECONDS,newLinkedBlockingQueue(3000),rnewThread(r,xxljob,adminJobLosedMonitorHelpercallbackThreadPoolr。hashCode()),(r,executor){r。run();logger。warn(xxljob,callbacktoofast,matchthreadpoolrejectedhandler(runnow)。);});
  当有回调请求时,publiccallback方法(该方法被AdminBizImpl调用)会在线程池中创建一个线程,遍历回调请求的参数列表,依次处理回调参数,代码如下:在线程池中创建线程处理回调参数publicReturnTStringcallback(ListHandleCallbackParamcallbackParamList){callbackThreadPool。execute(newRunnable(){Overridepublicvoidrun(){for(HandleCallbackParamhandleCallbackParam:callbackParamList){ReturnTStringcallbackResultcallback(handleCallbackParam);。。。}}});returnReturnT。SUCCESS;}privateReturnTStringcallback(HandleCallbackParamhandleCallbackParam){validlogitemXxlJobLoglogXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。load(handleCallbackParam。getLogId());更新log数据,略。。。更新和完成jobXxlJobCompleter。updateHandleInfoAndFinish(log);returnReturnT。SUCCESS;}
  callBack的调用顺序:JobApiControllerAdminBizImplpublicallbackprivatecallback。
  从代码可以看出,最后调用XxlJobCompleter。updateHandleInfoAndFinish方法完成回调逻辑。更新Job
  如果一个job较长时间前被调度,但是一直处于运行中且它所属的executor已经超过90秒没有心跳了,那么可以认为该job已经丢失了,需要把该job结束掉。这个就是线程monitorThread的主要功能。
  monitorThread会以60秒一次的频率,从xxljoblog表中找出10分钟前调度、仍处于运行中状态、executor已经下线的job,然后调用XxlJobCompleter。updateHandleInfoAndFinish来更新handler的信息和结束job,代码如下:监视丢失job的线程privateThreadmonitorThread;任务结果丢失处理:调度记录停留在运行中状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记为失败DatelosedTimeDateUtil。addMinutes(newDate(),10);ListLonglosedJobIdsXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。findLostJobIds(losedTime);if(losedJobIds!nulllosedJobIds。size()0){for(LonglogId:losedJobIds){XxlJobLogjobLognewXxlJobLog();jobLog。setId(logId);jobLog。setHandleTime(newDate());设置handlercodejobLog。setHandleCode(ReturnT。FAILCODE);jobLog。setHandleMsg(I18nUtil。getString(jobloglostfail));更新执行信息和结束jobXxlJobCompleter。updateHandleInfoAndFinish(jobLog);}}
  从代码可以看出,上面的两个功能最后都调用了XxlJobCompleter。updateHandleInfoAndFinish方法,关于该方法的介绍,可以看后面XxlJobCompleter部分的介绍,这里不详细展开。JobLogReportHelper
  Job日志统计辅助类
  如果去看XxlJobTrigger。triger方法,会发现每次调度job时,都会先新增一个jobLog记录,这也是为什么JobFailMonitorHelper中的线程在重试时,先查询jobLog的原因。
  JobLog作为job的调度记录,还可以用来统计一段时间内job的调度次数、成功数等;另外,会清理超出有效期(配置的参数logretentiondays)的日志,避免日志数据过大。很显然,这又是一个自动任务,可以使用一个线程定时完成。
  该类持有一个线程变量,线程以每分钟一次的频率,执行两个操作:统计一段时间的job数据,主要统计指标有:总的调度次数、处于调度运行中的次数、调度失败的次数、调度成功的次数;清理过期的日志数。
  在线程run方法的前半部分,线程会统计3天内,每天的调度次数、运行次数、成功运行数、失败次数;然后更新或新增xxljoblogreport表的数据。清理日志
  在线程run方法的后半部分,线程按天对日志进行清理,如果当前时间与上次清理的时间相隔超过一天,就会清理日志记录,代码如下:根据上次执行时间、配置的过期参数,来决定是否执行清理上次清理时间与当前超过1天才清理longlastCleanLogTime0;if(XxlJobAdminConfig。getAdminConfig()。getLogretentiondays()0System。currentTimeMillis()lastCleanLogTime2460601000){清理的开始时间。。。略开始清理日志ListLonglogIdsnull;do{logIdsXxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。findClearLogIds(0,0,clearBeforeTime,0,1000);if(logIds!nulllogIds。size()0){XxlJobAdminConfig。getAdminConfig()。getXxlJobLogDao()。clearLog(logIds);}}while(logIds!nulllogIds。size()0);更新执行清理操作的时间lastCleanLogTimeSystem。currentTimeMillis();}
  问题:为什么要用lastCleanLogTime记录上一次的清理时间?每次执行时,不能直接清理一天前创建的数据吗?
  如果不使用参数lastCleanLogTime来记录上次清理的时间,只是清理一天前创建的数据记录。那么该线程每分钟执行一次时,都会删除前天当前时刻的数据,导致前一年的数不完整。
  使用参数lastCleanLogTime来记录上次清理的时间,并且与当前时间相差超过一天时才清理,能保证前一天的日志是完整的。
  问题:为什么每次只删除1000条日志?
  不明白为什么清理日志时,不是一次性删除全部的过期日志,而是每次删除1000条。按理说,这些旧的日志数据应该已经不在bufferpool中了,triggertime字段又是普通索引,那么DELETE操作会先更新到changebuffer中,之后再合并。现在先查询再删除,相当于多了一次IO且没有使用到changebuffer。JobScheduleHelper
  Job调度辅助类
  admin服务是用来管理和调度job的,用户也可以在它的管理后台新建一个job,配置CRON和JobHandler,然后admin服务就会按照配置的参数来调度job。很显然,这种自动化工作也是由线程定时执行的。
  1、如果使用线程调度Job,存在的第一个问题是:如果某个Job在调度时比较耗时,就可能阻塞后续的Job,导致后续job的执行有延迟,怎么解决这个问题?
  在前面JobTriggerPoolHelper我们已经知道,admin在调度job时是使用线程池、线程异步执行调度任务,避免了主线程的阻塞。
  2、使用线程定时调度job,存在的第二个问题是:怎么保证job在指定的时间执行,而不会出现大量延迟?
  admin使用预读的方式,提前读取在未来一段时间内要执行的job,提前取到内存中,并使用时间轮算法按时间分组job,把未来要执行的job下一个时间段执行。
  3、还隐藏第三个问题:admin服务是可以多实例部署的,在这种情况下该怎么避免一个job被多个实例重复调度?
  admin把一张数据表作为分布式锁来保证只有一个admin实例能执行job调度,又通过随机sleep线程一段时间,来降低线程之间的竞争。
  下面我们就通过代码来了解xxljob是怎么解决上述问题的。调度线程
  在该类中,定义了一个调度线程,用来调度要执行的job和已经过期一段时间的job,定义代码如下:预读的毫秒数publicstaticfinallongPREREADMS5000;预读和调度过期任务的线程privateThreadscheduleThread;预读
  下面代码中的pushTimeRing,是把job添加到一个map对象ringData中,然后让另一个线程从该map对象中取出,再次调度
  该线程会预读出下次执行时间now5000毫秒内的部分job,根据它们下一次执行时间划分成三段,执行三种不同的逻辑。
  1、下次执行时间在(,now5000)范围内
  说明过期时间已经大于5000毫秒,这时如果过期策略要求调度,就调度一次。代码如下:if(nowTimejobInfo。getTriggerNextTime()PREREADMS){logger。warn(xxljob,schedulemisfire,jobIdjobInfo。getId());MisfireStrategyEnummisfireStrategyEnumMisfireStrategyEnum。match(jobInfo。getMisfireStrategy(),MisfireStrategyEnum。DONOTHING);if(MisfireStrategyEnum。FIREONCENOWmisfireStrategyEnum){调度一次JobTriggerPoolHelper。trigger(jobInfo。getId(),TriggerTypeEnum。MISFIRE,1,null,null,null);logger。debug(xxljob,schedulepushtrigger:jobIdjobInfo。getId());}更新下一次执行时间refreshNextValidTime(jobInfo,newDate());}
  2、下次执行时间在〔now5000,now)范围内
  说明过期时间小于5000毫秒,只能算是延迟不能算是过期,直接调度一次,代码如下:if(nowTimejobInfo。getTriggerNextTime()){1、调度一次JobTriggerPoolHelper。trigger(jobInfo。getId(),TriggerTypeEnum。CRON,1,null,null,null);logger。debug(xxljob,schedulepushtrigger:jobIdjobInfo。getId());2、更新下一次调度时间refreshNextValidTime(jobInfo,newDate());3、如果当前job处于可以被调度的状态,且下一次执行时间在5000毫秒内,就记录下jobId,等待后面轮询if(jobInfo。getTriggerStatus()1nowTimePREREADMSjobInfo。getTriggerNextTime()){下次调度的时刻:秒intringSecond(int)((jobInfo。getTriggerNextTime()1000)60);保存进ringData中pushTimeRing(ringSecond,jobInfo。getId());刷新下一次的调度时间refreshNextValidTime(jobInfo,newDate(jobInfo。getTriggerNextTime()));}}
  如果job的下一次执行时间在5000毫秒以内,为了省下下次预读的IO耗时,这里会记录下jobid,等待后面的调度。
  3、下次执行时间在〔now,now5000)范围内
  说明还没到执行时间,先记录下jobid,等待后面的调度,代码如下:intringSecond(int)((jobInfo。getTriggerNextTime()1000)60);pushTimeRing(ringSecond,jobInfo。getId());refreshNextValidTime(jobInfo,newDate(jobInfo。getTriggerNextTime()));
  上面的3个步骤结束后,会更新jobInfo的triggerlasttime、triggernexttime、triggerstatus字段:更新job数据for(XxlJobInfojobInfo:scheduleList){XxlJobAdminConfig。getAdminConfig()。getXxlJobInfoDao()。scheduleUpdate(jobInfo);}
  可以看到,通过预读,一方面会把过期一小段时间的job执行一遍,另一方面会把未来一小段时间内要执行的job取出,保存进一个map对象ringData中,等待另一个线程调度。这样就避免了某些job到了时间还没执行。分布式锁
  因为admin是可以多实例部署的,所以在调度job时,需要考虑怎么避免job被多次调度。
  xxljob在前面JobFailMonitorHelper中遍历失败的job时,会对每个job设置一个无效的状态作为分布式行锁,如果设置失败就跳过。而在这里,如果还使用该方法,有可能出现,一个job被设置为无效状态后,线程就崩溃了,导致该job永远无法被调度。因此,要尽量避免对job状态的修改。
  在这里,admin服务使用一张表xxljoblock作为分布式锁,每个admin实例都要先尝试获取该表的锁,获取成功才能继续执行;同时,为了降低不同实例之间的竞争,会在线程开始执勤随机sleep一段时间。
  如何获取分布式锁?
  在线程中会开启一个事务,设置为手动提交,然后对表xxljoblock执行FORUPDATE查询。如果该线程执行语句成功,其他实例的线程就会排队等待该表的锁,实现了分布式锁功能。代码如下:获取数据库链接,通过SELECTFORUPDATE来尝试获取X锁在事务提交前一直持有该锁,其他实例的线程想获取该锁就会失败,并且会排队等待,直到第一个事务提交释放或锁超时connXxlJobAdminConfig。getAdminConfig()。getDataSource()。getConnection();connAutoCommitconn。getAutoCommit();conn。setAutoCommit(false);preparedStatementconn。prepareStatement(selectfromxxljoblockwherelocknameschedulelockforupdate);preparedStatement。execute();提交事务,释放X锁if(conn!null){try{conn。commit();}catch(SQLExceptione){。。。}try{conn。setAutoCommit(connAutoCommit);}catch(SQLExceptione){。。。}try{conn。close();}catch(SQLExceptione){。。。}}
  怎么降低锁的竞争?
  为了降低锁竞争,在线程开始前会先sleep40005000毫秒的随机值(不能大于5000毫秒,5000毫秒是预读的时间范围);在线程结束当前循环时,会根据耗时和是否有预读数据,选择不同的sleep策略:耗时超过1000毫秒,不sleep,直接开始下一次循环;耗时小于1000毫秒,根据是否有预读数据,sleep一个大小不同的随机时长:有预读数据,sleep时间短一些,在01000毫秒范围内;没有预读数据,sleep时间长一些,在04000毫秒范围内;
  代码如下:try{随机sleep40005000毫秒,通过这种方式,降低多实例部署时对锁的竞争这里也看出来,最多部署5000台实例。。TimeUnit。MILLISECONDS。sleep(5000System。currentTimeMillis()1000);}catch(InterruptedExceptione){if(!scheduleThreadToStop){logger。error(e。getMessage(),e);}}耗时超过1000毫秒,就不sleep不超过1000ms,就sleep一个随机时长longcostSystem。currentTimeMillis()start;if(cost1000){try{没有预读数据,就sleep时间长一点;有预读数据,就sleep时间短一些TimeUnit。MILLISECONDS。sleep((preReadSuc?1000:PREREADMS)System。currentTimeMillis()1000);}catch(InterruptedExceptione){if(!scheduleThreadToStop){logger。error(e。getMessage(),e);}}}ringThread:时间轮
  在前面的线程中,对即将要开始的job,不是立刻调度,而是按照执行的时刻(秒),把jobid保存进一个map中,然后由ringThread线程按时刻进行调度,这只典型的时间轮算法。代码如下:调度线程2privateThreadringThread;按时刻(秒)调度jobprivatevolatilestaticMapInteger,ListIntegerringDatanewConcurrentHashMap();调度过程ListIntegerringItemDatanewArrayList();每次取出2个时刻的job来调度intnowSecondCalendar。getInstance()。get(Calendar。SECOND);for(inti0;i2;i){ListIntegertmpDataringData。remove((nowSecond60i)60);if(tmpData!null){ringItemData。addAll(tmpData);}}遍历jobId,执行调度if(ringItemData。size()0){for(intjobId:ringItemData){JobTriggerPoolHelper。trigger(jobId,TriggerTypeEnum。CRON,1,null,null,null);}ringItemData。clear();}
  每次轮询调度时,只取出当前时刻(秒)、前一秒内的job,不会去调度与现在相隔太久的job。
  在执行轮询调度前,有一个时间在01000毫秒范围内的sleep。如果没有这个sleep,该线程会一直执行,而ringData中当前时刻(秒)的数据可能已经为空,会导致大量无效的操作;增加了这个sleep之后,可以避免这种无效的操作。之所以sleep时间在1000毫秒以内,是因为调度时刻最小精确到秒,一秒的sleep可以避免job的延迟。try{TimeUnit。MILLISECONDS。sleep(1000System。currentTimeMillis()1000);}catch(InterruptedExceptione){if(!ringThreadToStop){logger。error(e。getMessage(),e);}}
  问题:为什么在时间轮调度时,没有加分布式锁?
  因为在前面的scheduleThread线程中,最后一个操作是把job的nexttriggertime值更新为大于now5000毫秒,其他admin实例scheduleThread线程的查询条件是:nexttriggertimenow5000,不会查询出这里调度的job,所以不需要加分布式锁。
  至此,XxlJobSchedulerinit方法的作用我们介绍完毕,下面我们简单介绍一下XxlJobSchedulerdestroy方法XxlJobSchedulerdestroy
  destroy方法很简单,就是销毁前面初始化的线程池和线程,它销毁的顺序与前面启动的顺序相反。
  代码如下:销毁,销毁过程与init顺序相反publicvoiddestroy()throwsException{1、销毁调度线程JobScheduleHelper。getInstance()。toStop();2、销毁日志统计和清理线程JobLogReportHelper。getInstance()。toStop();3、销毁丢失job监视器线程JobCompleteHelper。getInstance()。toStop();4、销毁失败job监视器线程JobFailMonitorHelper。getInstance()。toStop();adminregistrystopJobRegistryHelper。getInstance()。toStop();admintriggerpoolstopJobTriggerPoolHelper。toStop();}
  因为各个toStop方法都很相似,所以我们只介绍JobScheduleHelper的toStop方法。
  该方法的步骤如下:
  1、设置停止标志位为true;
  2、sleep一段时间,让出CPU时间片给线程执行任务;
  3、如果线程不是终止状态(线程正在sleep),中断它;
  4、线程执行join方法,直到线程结束,执行最后一次。
  代码如下:scheduleThreadToStoptrue;给线程1s的时间去执行任务try{TimeUnit。SECONDS。sleep(1);}catch(InterruptedExceptione){logger。error(e。getMessage(),e);}如果线程不是终止状态,就让它执行完所有任务if(scheduleThread。getState()!Thread。State。TERMINATED){scheduleThread。interrupt();try{scheduleThread。join();}catch(InterruptedExceptione){logger。error(e。getMessage(),e);}}
  至此,JobScheduleHelper的主要功能就介绍完了,可以看出,admin服务在启动时,启动了多个线程池和线程,异步执行任务和异步响应executor的请求。
  下面,我们介绍前面涉及到的XxlJobTrigger和XxlJobCompleter。XxlJobTrigger
  调度job时的封装类
  XxlJobTrigger是调度job时的封装类,它主要工作就是接受传入的jobId、调度参数等,查询对应的jobGroup、jobInfo,然后调用ExecutorBiz对象来执行调度(run方法)。
  注意:这个类本身不会执行http请求,http请求是在core包下的工具类XxlJobRemotingUtil中执行的。
  该类中三个核心方法及其调用关系如下:triggerprocessTriggerrunExecutor,trigger
  该方法的功能比较简单,就是根据传入的参数查询jobGroup和jobInfo对象,设置相关的字段值,然后调用processTrigger方法。processTrigger
  该方法的主要工作分为以下几步:
  1、保存一条调度日志;
  2、从jobInfo、jobGroup中取出字段值,构造TriggerParam对象;
  3、根据jobInfo的路由策略,从jobGroup中取出要调度的executor地址;
  4、调用runExecutor方法执行调度;
  5、保存调度参数、设置调度信息、更新日志。
  这里不会修改jobInfo、jobGroup对象的字段值,只取出字段值来使用,对这两个对象字段的修改,是在前一步trigger方法中进行的。runExecutor
  该方法会执行调度,并返回调度结果,它的核心代码如下:ExecutorBizexecutorBizXxlJobScheduler。getExecutorBiz(address);runResultexecutorBiz。run(triggerParam);
  这里使用XxlJobScheduler类取出ExecutorBiz对象,以懒加载的方式给每个address创建一个ExecutorBiz对象,代码如下:privatestaticConcurrentMapString,ExecutorBizexecutorBizRepositorynewConcurrentHashMapString,ExecutorBiz();publicstaticExecutorBizgetExecutorBiz(Stringaddress)throwsException{validif(addressnulladdress。trim()。length()0){returnnull;}loadcacheaddressaddress。trim();ExecutorBizexecutorBizexecutorBizRepository。get(address);if(executorBiz!null){returnexecutorBiz;}setcacheexecutorBiznewExecutorBizClient(address,XxlJobAdminConfig。getAdminConfig()。getAccessToken());executorBizRepository。put(address,executorBiz);returnexecutorBiz;}
  吐槽一句:这个功能完全可以放在XxlJobTrigger类中、或者封装在ExecutorBiz内部,不知道为什么要放在XxlJobScheduler,平白无故多了一层调用。
  可以看出,该类中的三个方法其实可以归类为:preexecutepost,在执行前、执行时、执行后做一些前置和收尾工作。XxlJobCompleter
  job的完成类
  该类在前面JobCompleteHelper中被使用,最终job的完成就是在该类中执行的,该类有两个主要方法:updateHandleInfoAndFinish:公共方法,调用finishJob方法和更新日志;finishJob:私有方法,执行子任务和更新日志;
  下面主要介绍finishJob方法。finishJob
  finishJob的主要功能是:如果当前任务执行成功了,就调度它的所有子任务,最后把子任务的调度消息添加到当前job的日志中。代码如下:privatestaticvoidfinishJob(XxlJobLogxxlJobLog){1、job执行成功,开始调度子任务handlesuccess,totriggerchildjobStringBuildertriggerChildMsgnull;if(XxlJobContext。HANDLECODESUCCESSxxlJobLog。getHandleCode()){XxlJobInfoxxlJobInfoXxlJobAdminConfig。getAdminConfig()。getXxlJobInfoDao()。loadById(xxlJobLog。getJobId());if(xxlJobInfo!nullxxlJobInfo。getChildJobId()!nullxxlJobInfo。getChildJobId()。trim()。length()0){2、遍历子任务IDString〔〕childJobIdsxxlJobInfo。getChildJobId()。split(,);for(inti0;ichildJobIds。length;i){intchildJobId(childJobIds〔i〕!nullchildJobIds〔i〕。trim()。length()0isNumeric(childJobIds〔i〕))?Integer。parseInt(childJobIds〔i〕):1;if(childJobId0){3、调度子任务JobTriggerPoolHelper。trigger(childJobId,TriggerTypeEnum。PARENT,1,null,null,null);ReturnTStringtriggerChildResultReturnT。SUCCESS;4、添加日志信息。。。。略}}}5、保存子任务的调度消息到日志if(triggerChildMsg!null){xxlJobLog。setHandleMsg(xxlJobLog。getHandleMsg()triggerChildMsg);}}
  需要注意的是:
  1、这里依赖于JobTriggerPoolHelper来调度job,所以在JobCompleteHelper的监视线程开始时,有一个50秒的等待,就是等待JobTriggerPoolHelper启动完成;
  2、在finishJob方法中,调度子任务的时候,默认子任务的调度结果是成功,注意,这里是指调度这个行为是成功的,而不是指子任务执行是成功的。总结
  1、XxlJobAdminConfig作为admin服务的启动入口,要尽可能保持简洁,作用类似于一个仓库,来管理和持有所有的类和对象,并不会去启动具体的线程,它只需要按下启动器的按钮就可以了;
  2、XxlJobScheduler是admin服务的启动器类,它会调用各个辅助类(xxxHelper)来启动对应的线程;
  3、对外的接口,比如调度job、接收注册或下线等,都是使用线程池线程的异步方式实现,避免job对主线程的阻塞;
  4、对自动任务类的功能,都是使用线程定时执行;参考阅读
  XXLJOB分布式调度框架全面详解:https:juejin。cnpost6948397386926391333
  时间轮算法:https:spongecaptain。coolpostwidgettimingwheel
  一个开源的时间轮算法介绍:https:spongecaptain。coolpostwidgettimingwheel2

如果把地球直线挖通,人跳进去会摔死吗?还是会掉到另一端?为什把地球沿一条直径挖通,人竖直着跳进去,问人的运动情况会怎么样?这是一个非常常见的模型,这个简单的模型可以有助于理解很多概念。虽然现实中不可能挖出这样的洞,也不会让人跳进去。……下半年让人期待的三款旗舰手机2021年已经过半,下半年的旗舰才是重点。这三款手机必须关注。第一款:iPhone13系列去年发布的iPhone12系列,虽然没有高刷屏,没有大电池和快充,但是仍然在一片……微信才是内存杀手!别乱清理了,关闭这些功能,手机立马流畅如今的社会已经成为信息时代,而现在的信息时代最离不开的便是这人人手里都会拿着的手机。相信大家不论是看新闻头条,还是聊天,购物等等都会在手机上进行,所以现如今的主流必备品慢慢进化……装修时如何打造全屋WiFi?如何用800块,完爆别人家2000块的全屋WiFi,现在的人外出离不开手机,在家手机离不开WiFi,只要手机有电有网,在家大到办公,小到超市购物,都可以足不出户全部完成。……哪个牌子的学生平板好用又便宜?谢谢邀请。给孩子买平板,和家长说一堆软硬件知识和测评,用处不大。因为,懂这些的,不用你讲,不懂的,你讲了也还是不懂。我是一个初二孩子的家长,现在从传统媒体转型做移动互联网……华为,小米,OPPO,VIVO上班族选哪个?3000元左右的华为、小米、OPPO、ViVo,上班族选哪个吗?3000元左右的?新年新气象,换手机不踩坑。的确现如今智能手机竞争激烈。如果想要华为3000元以下的,不考虑5G手机,可以……英国自动驾驶事故由车企担责百度汽车2023年量产封面天天见封责任明确英国自动驾驶汽车事故由车企担责日前,英国法律监管机构提议,自动驾驶汽车在发生交通事故时,车内的驾乘人员将不用承担任何责任,而事故责任方将是自动驾驶技术研发企……你如果在农村有个小农场,你要怎么发展它?谢谢邀请。你的提问是如果家里有一个小农场应该怎么发展,关于这个问题我个人曾经考虑过多次,自已真想拥有五百亩地的一个小农场,我真想打造成一个现代化的农业庄圆和游玩宝地,……网易试水AI音乐创作,可10秒定制一首拜年歌北京商报讯(记者魏蔚)1月25日,网易推出一站式音乐创作平台网易天音。用户只需在微信搜索网易天音拜年小程序,输入祝福对象、祝福语,10秒就能搞定词曲编唱,定制一首拜年歌。量身定……天猫公布今年双11的节奏预售期售卖期均分为两个阶段出品搜狐科技编辑尹莉娜9月2日,阿里巴巴副总裁、天猫副总裁吹雪在阿里妈妈m峰会上正式公布了今年双11的节奏。吹雪介绍,今年双11和去年一样,依旧保持双节棍的节……小dou课堂揭秘空调一晚一度电,是真的吗?从时令上来说,现在已经入秋了。但是南昌的天气一点儿也没有秋天的味道,似乎比往常更热一些。一查天气,气温32,正是盛夏时节。想来,空调、WiFi、西瓜的日子不得不再延续几天。这不……北京年薪40万以上的程序员多吗?因为是985和211的硕士博士,还是有不少的能达到的,要达到这个地步其实也是不难的:1、进入BAT,工作5年左右,薪资股票大体接近100万2、选择比较热门的方向:大……
福源新城未来可期?兰州受困于两山夹一河太久了,严重制约了兰州经济社会发展,在第四版城市规划中,将河口、领袖山、彭家坪、九洲、青白石、和平、定远、连搭、金来等周边区域纳入了中心城区范围,这无疑是市……双11王炸出击realme真我Q2系列发布,998元起2020年10月13日中国深圳全球成长最快智能手机品牌realme真我于今日10:00正式召开王炸出击真我Q2系列新品发布会,推出专为双11而生的真我Q2、真我Q2Pro和真我……新兴技术落地,MicroLED直显技术引领新一代消费浪潮6月16日,利亚德推出MicroLED全系家用大电视,尺寸覆盖108英寸、135英寸、162英寸以及216英寸。纵观过去30年,LED显示行业发展路径可谓弹跳式飞跃,几乎……酷相思,梦落枫玉杵馀丹,金刀剩彩,重染吴江孤树。几点朱铅,几度怨啼秋暮。忆旧梦,绿鬓轻凋;诉新恨,绛唇微注。最堪怜,月拂渐霜,绣蓉一镜晚妆妒。千林摇落渐少,何事西风老……中国市场份额不足1,三星或在9月关闭在华最后手机工厂?据悉,三星最快会在今年9月份关闭在中国的最后一家手机工厂,目前他们正在做最后的清算工作。公开资料显示,由于三星手机在中国市场的存在感越来越弱,其最快会在今年9月份关闭在中国的最……简报任天堂2021年9月24日的直面会任天堂2021年9月24日的直面会,新品内容汇总,一些已经公布消息的作品比如《马里奥聚会》我就不讲了:小泉欢晃亲自主持怪物猎人riseG明确表示这是资料片,不过目前……华为nova4发布了,国产第一部屏下镜头手机戳到你哪个点了?2018年12月17日,国产第一部屏下镜头手机在汪涵何炅的老家湖南长沙正式发布没错,就是华为Nova4!正面采用一块6。4英寸挖孔全面屏,屏占比高达91。8采……RTX3080Ti挖矿性能曝光58MH,售价少于一万算我输近日,网上爆出即将发布的RTX3080Ti新卡的算力截图,58MHs,算力近乎RTX3090腰斩。不过这倒也不令人意外,因为从目前可信度较高的爆料来看,RTX3080Ti……家用空气净化器和新风系统有何异同?随着家用新风系统、空气净化器逐渐应用到室内通风换气、净化空气,人们对空气净化器和新风系统的认识也在逐渐深入,包括结构、功能、用途等等方面。以下就简单归纳空气净化器和新风机的之间……看视频追剧者的福利,办理这个业务不吃亏相信很多小伙伴都喜欢用手机看视频或者追剧,手机已经成为我们生活的一部分,没有生活,生活就会变得索然无味,利用手机上的各种视频软件来满足娱乐我们的日常生活,每个人的手机上都会有各……吴京被成为战狼公司代言人,上诉获赔34。2万元最近,战狼品牌管理有限公司(下称战狼公司)与吴京网络侵权责任纠纷一案,北京市第四中级人民法院驳回上诉,维持原判:战狼公司赔偿吴京经济损失34万元、公证费2000元,合计3……分布式光伏项目高速高质量增长,普洛斯普枫新能源明确三大方向作为国内屋顶光伏发电领域的头部企业,普洛斯普枫新能源已投资开发100多个物流园区的分布式光伏项目,并帮助这些物流园区至少在运营层面实现了碳中和。据悉,预计到今年底,其分布式光伏……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网