如果你想在Linux服务器上周期性地执行某个Python脚本,最出名的选择应该是Crontab脚本,但是Crontab具有以下缺点: 1。不方便执行strong秒级的任务strong。 2。当需要执行的定时任务有上百个的时候,Crontab的strong管理就会特别不方便strong。 另外一个选择是Celery,但是Celery的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery不会是一个好选择。 在你想要使用一个轻量级的任务调度工具,而且希望它尽量简单、容易使用、不需要外部依赖,最好能够容纳Crontab的所有基本功能,那么Schedule模块是你的不二之选。 使用它来调度任务可能只需要几行代码,感受一下:Python实用宝典 importschedule importtime defjob: print(Imworking。。。) schedule。every(10)。minutes。do(job) whileTrue: schedule。runpending time。sleep(1) 上面的代码表示每10分钟执行一次job函数,非常简单方便。你只需要引入schedule模块,通过调用strongtoutiaoorigincodeclasshighlighttextscedule。every(时间数)。时间类型。do(job)strong发布周期任务。 发布后的周期任务需要用strongtoutiaoorigincodeclasshighlighttextrunpendingstrong函数来检测是否执行,因此需要一个strongtoutiaoorigincodeclasshighlighttextWhilestrong循环不断地轮询这个函数。 下面具体讲讲Schedule模块的安装和初级、进阶使用方法。 1。准备 请选择以下任一种方式输入命令安装依赖: 1。Windows环境打开Cmd(开始运行CMD)。 2。MacOS环境打开Terminal(command空格输入Terminal)。 3。如果你用的是VSCode编辑器或Pycharm,可以直接使用界面下方的Terminal。pipinstallschedule 2。基本使用 最基本的使用在文首已经提到过,下面给大家展示更多的调度任务例子:Python实用宝典 importschedule importtime defjob: print(Imworking。。。) 每十分钟执行任务 schedule。every(10)。minutes。do(job) 每个小时执行任务 schedule。every。hour。do(job) 每天的10:30执行任务 schedule。every。day。at(10:30)。do(job) 每个月执行任务 schedule。every。monday。do(job) 每个星期三的13:15分执行任务 schedule。every。wednesday。at(13:15)。do(job) 每分钟的第17秒执行任务 schedule。every。minute。at(:17)。do(job) whileTrue: schedule。runpending time。sleep(1) 可以看到,从月到秒的配置,上面的例子都覆盖到了。不过如果你想只运行一次任务的话,可以这么配:Python实用宝典 importschedule importtime defjobthatexecutesonce: 此处编写的任务只会执行一次。。。 returnschedule。CancelJob schedule。every。day。at(22:30)。do(jobthatexecutesonce) whileTrue: schedule。runpending time。sleep(1) 参数传递 如果你有参数需要传递给作业去执行,你只需要这么做:Python实用宝典 importschedule defgreet(name): print(Hello,name) do将额外的参数传递给job函数 schedule。every(2)。seconds。do(greet,nameAlice) schedule。every(4)。seconds。do(greet,nameBob) 获取目前所有的作业 如果你想获取目前所有的作业:Python实用宝典 importschedule defhello: print(Helloworld) schedule。every。second。do(hello) alljobsschedule。getjobs 取消所有作业 如果某些机制触发了,你需要立即清除当前程序的所有作业:Python实用宝典 importschedule defgreet(name): print(Hello{}。format(name)) schedule。every。second。do(greet) schedule。clear 标签功能 在设置作业的时候,为了后续方便管理作业,你可以给作业打个标签,这样你可以通过标签过滤获取作业或取消作业。Python实用宝典 importschedule defgreet(name): print(Hello{}。format(name)) 。tag打标签 schedule。every。day。do(greet,Andrea)。tag(dailytasks,friend) schedule。every。hour。do(greet,John)。tag(hourlytasks,friend) schedule。every。hour。do(greet,Monica)。tag(hourlytasks,customer) schedule。every。day。do(greet,Derek)。tag(dailytasks,guest) getjobs(标签):可以获取所有该标签的任务 friendsschedule。getjobs(friend) 取消所有dailytasks标签的任务 schedule。clear(dailytasks) 设定作业截止时间 如果你需要让某个作业到某个时间截止,你可以通过这个方法:Python实用宝典 importschedule fromdatetimeimportdatetime,timedelta,time defjob: print(Boo) 每个小时运行作业,18:30后停止 schedule。every(1)。hours。until(18:30)。do(job) 每个小时运行作业,2030010118:33today schedule。every(1)。hours。until(2030010118:33)。do(job) 每个小时运行作业,8个小时后停止 schedule。every(1)。hours。until(timedelta(hours8))。do(job) 每个小时运行作业,11:32:42后停止 schedule。every(1)。hours。until(time(11,33,42))。do(job) 每个小时运行作业,202051711:36:20后停止 schedule。every(1)。hours。until(datetime(2020,5,17,11,36,20))。do(job) 截止日期之后,该作业将无法运行。 立即运行所有作业,而不管其安排如何 如果某个机制触发了,你需要立即运行所有作业,可以调用strongtoutiaoorigincodeclasshighlighttextschedule。runallstrong:Python实用宝典 importschedule defjob1: print(Foo) defjob2: print(Bar) schedule。every。monday。at(12:40)。do(job1) schedule。every。tuesday。at(16:40)。do(job2) schedule。runall 立即运行所有作业,每次作业间隔10秒 schedule。runall(delayseconds10) 3。高级使用 装饰器安排作业 如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:Python实用宝典 fromscheduleimportevery,repeat,runpending importtime 此装饰器效果等同于schedule。every(10)。minutes。do(job) repeat(every(10)。minutes) defjob: print(Iamascheduledjob) whileTrue: runpending time。sleep(1) 并行执行 默认情况下,Schedule按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。 不过你可以通过多线程的形式来运行每个作业以解决此限制:Python实用宝典 importthreading importtime importschedule defjob1: print(Imrunningonthreadsthreading。currentthread) defjob2: print(Imrunningonthreadsthreading。currentthread) defjob3: print(Imrunningonthreadsthreading。currentthread) defrunthreaded(jobfunc): jobthreadthreading。Thread(targetjobfunc) jobthread。start schedule。every(10)。seconds。do(runthreaded,job1) schedule。every(10)。seconds。do(runthreaded,job2) schedule。every(10)。seconds。do(runthreaded,job3) whileTrue: schedule。runpending time。sleep(1) 日志记录 Schedule模块同时也支持logging日志记录,这么使用:Python实用宝典 importschedule importlogging logging。basicConfig scheduleloggerlogging。getLogger(schedule) 日志级别为DEBUG schedulelogger。setLevel(levellogging。DEBUG) defjob: print(Hello,Logs) schedule。every。second。do(job) schedule。runall schedule。clear 效果如下:DEBUG:schedule:Runningall1jobswith0sdelayinbetween DEBUG:schedule:RunningjobJob(interval1,unitseconds,dojob,args,kwargs{}) Hello,Logs DEBUG:schedule:Deletingalljobs 异常处理 Schedule不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。 你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:Python实用宝典 importfunctools defcatchexceptions(cancelonfailureFalse): defcatchexceptionsdecorator(jobfunc): functools。wraps(jobfunc) defwrapper(args,kwargs): try: returnjobfunc(args,kwargs) except: importtraceback print(traceback。formatexc) ifcancelonfailure: returnschedule。CancelJob returnwrapper returncatchexceptionsdecorator catchexceptions(cancelonfailureTrue) defbadtask: return10 schedule。every(5)。minutes。do(badtask) 这样,strongtoutiaoorigincodeclasshighlighttextbadtaskstrong在执行时遇到的任何错误,都会被strongtoutiaoorigincodeclasshighlighttextcatchexceptionsstrong捕获,这点在保证调度任务正常运转的时候非常关键。