来源:早起Python 作者:刘早起 在Python中,多线程最常见的一个场景就是爬虫,例如这样一个需求,有多个结构一样的页面需要爬取,例如下方的URL(豆瓣阿凡达影评,以10个为例)urllist〔https:movie。douban。comsubject1652587reviews?sorttimestart0,https:movie。douban。comsubject1652587reviews?sorttimestart20,https:movie。douban。comsubject1652587reviews?sorttimestart40,https:movie。douban。comsubject1652587reviews?sorttimestart60,https:movie。douban。comsubject1652587reviews?sorttimestart80,https:movie。douban。comsubject1652587reviews?sorttimestart100,https:movie。douban。comsubject1652587reviews?sorttimestart120,https:movie。douban。comsubject1652587reviews?sorttimestart140,https:movie。douban。comsubject1652587reviews?sorttimestart160,https:movie。douban。comsubject1652587reviews?sorttimestart180〕 如果依次爬取,请求第一个页面得到返回数据解析数据提取、存储数据请求第二个页面,按照这样的思路,那么大量时间都会浪费在请求、返回数据上,如果在等待第一个页面返回数据时去请求第二个页面,就能有效的提高效率,多线程就可以实现这样的功能。 在Python中实现多线程的方法也很多,我将基于threading模块一点一点介绍,注意本文不会太注重于多线程背后的技术概念(面试常问),仅希望用最少的话教会大家如何实现。当然会在最后介绍如何使用threading模块来解决上面的爬虫问题。threading基本使用 让我们先从一个简单的例子开始,定义dosomething函数,执行该函数需要消耗1秒importtimestarttime。perfcounter()defdosomething():print(线程启动)time。sleep(1)print(线程结束)dosomething()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 上面的代码不难理解,执行dosomething并计算耗时,结果很明显应该是1s线程启动线程结束全部任务执行完成,耗时1。01秒 现在如果需要执行两次dosomething,按照最基本的思路importtimestarttime。perfcounter()defdosomething():print(线程启动)time。sleep(1)print(线程结束)dosomething()dosomething()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 执行上面代码结果也很容易猜到是2秒线程启动线程结束线程启动线程结束全部任务执行完成,耗时2。01秒 这就是最常规的同步思路,在CPU执行第一个函数,也就是等待1s的时间内,什么也不干,等第一个函数执行完毕后再执行第二个函数 很明显,这样让CPU干等着啥也不干并不是一个很好的选择,而多线程就是解决这一问题的方法之一,让CPU在等待某个任务完成时去执行更多的操作,将整个过程简化为下图流程,这样就能充分节省时间 现在使用threading来通过多线程的方式实现上面的过程,非常简单,定义两个线程并依次启动即可importtimeimportthreadingstarttime。perfcounter()defdosomething():print(线程启动)time。sleep(1)print(线程结束)thread1threading。Thread(targetdosomething)thread2threading。Thread(targetdosomething)thread1。start()thread2。start()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 执行上面的代码,结果如下线程启动线程启动全部任务执行完成,耗时0。0秒线程结束线程结束 可以看到,两个子线程确实同时启动,但是主线程并未等待两个子线程执行完毕就直接结束。 为了解决这个问题,我们可以使用threading。join()方法,意思是在子线程完成运行之前,这个子线程的父线程将一直被阻塞 换成人话就是让主线程挂起,等待所有子线程结束再执行,体现到代码上也很简单,只需要添加两行即可importtimeimportthreadingstarttime。perfcounter()defdosomething():print(线程启动)time。sleep(1)print(线程结束)thread1threading。Thread(targetdosomething)thread2threading。Thread(targetdosomething)thread1。start()thread2。start()thread1。join()thread2。join()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 运行结果如下,全部代码在1秒内运行完毕线程启动线程启动线程结束线程结束全部任务执行完成,耗时1。01秒 至此,我们就得到了第一个有效的多线程代码,相信你也能大致明白threading的基本使用流程。传递参数 现在来看看如何在多线程之间传递参数,让我们升级代码:dosomething函数来接受一个参数,控制他睡眠等待的时间defdosomething(num):print(f线程{num}启动,睡眠{num}秒)time。sleep(num)print(f线程{num}结束) 在threading中,创建线程时可以使用args来传递参数,例如现在接收一个参数,则上一小节的代码可以如下修改importtimeimportthreadingstarttime。perfcounter()defdosomething(num):print(f线程{num}启动,睡眠{num}秒)time。sleep(num)print(f线程{num}结束)thread1threading。Thread(targetdosomething,args〔1〕)thread2threading。Thread(targetdosomething,args〔2〕)thread1。start()thread2。start()thread1。join()thread2。join()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 这段代码中,我分别让两个线程等待1、2秒,运行结果显然应该是2秒线程1启动,睡眠1秒线程2启动,睡眠2秒线程1结束线程2结束全部任务执行完成,耗时2。01秒 如果你的线程函数需要更多的参数,只需要依次向args中追加即可。简化代码 上面的案例中,我们仅开启了两个线程,如果是更多个线程的话,再依次重复定义、启动就会显得十分繁琐,此时我们可以使用循环来处理。 例如开启10个线程,依次睡眠110秒,可以先创建一个list用于存储每个线程,接着利用循环依次创建线程,启动后追加到刚刚创建的list中,之后再依次等待每个线程执行完毕,代码如下importtimeimportthreadingstarttime。perfcounter()defdosomething(num):print(f线程{num}启动,睡眠{num}秒)time。sleep(num)print(f线程{num}结束)threadlist〔〕foriinrange(1,11):threadthreading。Thread(targetdosomething,args〔i〕)thread。start()threadlist。append(thread)fortinthreadlist:t。join()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 结果是显然的,虽然我们执行了十次dosomething,每次用时110秒,但总耗时应该为10秒线程1启动,睡眠1秒线程2启动,睡眠2秒线程3启动,睡眠3秒线程4启动,睡眠4秒线程5启动,睡眠5秒线程6启动,睡眠6秒线程7启动,睡眠7秒线程8启动,睡眠8秒线程9启动,睡眠9秒线程10启动,睡眠10秒线程1结束线程2结束线程3结束线程4结束线程5结束线程6结束线程7结束线程8结束线程9结束线程10结束全部任务执行完成,耗时10。01秒共享变量锁的问题 现在,你应该已经了解threading最基本的用法,只需要将dosomthing函数进行修改即可,但是如果你深入使用,还会有其他的问题出现,例如共享变量的问题,让我们继续探讨。 多线程很常见的一个应用就是爬虫,回到开头的爬虫问题,如果我们希望爬取10个网页的评论,可能会先定一个空dataframe,然后使用多线程都往这个dataframe中写入数据,但由于多个线程同时操作这一个变量,可能会导致评论并不是按照顺序写入的。 例如第一个页面有10条评论,第一个线程写入了2条后,第二个线程将第二个页面的前两条写入,最终导致十个页面的评论是乱序存储! 让我们把这个问题抽象出来,还是之前的代码,稍微修改一下 我们先定义了一个空list,线程函数会将传入的数字添加到该list中,在未加锁的情况下,由于线程竞争,虽然我们线程是按照顺序开启,但是最终数字并不是按照顺序写入。 有没有办法解决呢?当然有,很自然的想法就是当第一个线程操作该变量时,其他线程等着,写完了再释放,这就是锁! 先看代码 在上面的代码中,我们使用threding。Lock创建了一个线程锁,之后在线程函数操作result前,首先使用lock。acquire()加上锁,之后操作results,在修改完后使用lock。relese()释放,此时其他线程若想操作results则会阻塞,等该线程释放后才能拿走操作中,这样我们就保证了线程是安全的! 最基本的线程锁用法就如上面代码所示,定义锁上锁解锁,但是一定要注意,lock。acquire()和lock。relese(),如果加了锁但是没有释放,后面的线程将会全部阻塞!限制线程数量 最后还有一个常见的问题,上面我们需要执行几次线程函数就开了几个线程,但是如果需要爬成千上万个网页,开这么多线程cpu一定不同意,代码也会在开启的线程达到一定数量后报错。 所以如何让程序只启动我们指定的线程数量,例如一次开五个线程,结束一个再添加一个,直到全部任务完成? 还是锁!在threading模块中有一个BoundedSemaphore(信号量)类,我们可以给他一个初始的信号量(最大线程数),之后每次有线程获得信号量的时候(即acquire())计数器1,释放信号量时候(release())计数器1,计数器为0的时候其它线程就被阻塞无法获得信号量。当计数器为设定好的上限的时候BoundedSemaphore就无法进行release()操作了。 体现到代码上则比较简单,还是基于上面的例子修改 总共需要运行十次,我们定义最大线程数为3,并在线程启动前调用acquire方法增加一个计数,在线程最后释放。 此时程序一次只能启动三个线程,如图中所示,首先启动123,之后完成123,启动456,当第四个线程结束启动第七个线程直到全部线程结束。 这里我们同时使用了上一节说的线程锁来保护变量,用BoundedSemaphore锁来控制最大线程数,在实际写代码时就需要小心检查锁是否正确释放,否则就会报错!一个真实的多线程爬虫案例 至此,threading模块最常见的用法就介绍完毕,现在让我们回到本文一开始的问题,有多个(以十个为例)URL需要爬取,既然每个页面需要执行的操作一样,如果等待一个页面爬取完毕再爬第二页面就太浪费时间了。这时就可以仿照上面的思路去使用多线程加速。 我们只需要将上面的dosomething函数修改为对也面的爬取操作,之后的创建启动线程操作不变即可,代码如下importtimeimportthreadingimportrequestsimportpandasaspdfromfakerimportFakerfrombs4importBeautifulSoupdefcrawurl(url):globaldffakeFaker()headers{UserAgent:fake。useragent()}rrequests。get(url,headersheaders)soupBeautifulSoup(r。content,html。parser)reviewlistsoup。findall(classmainreviewitem)foriinrange(len(reviewlist)):rankreviewlist〔i〕。select(span)〔0〕。get(title)time1reviewlist〔i〕。select(span)〔1〕。get(content)titlereviewlist〔i〕。select(h2a)〔0〕。textdfdf。append({时间:time1,评分:rank,标题:title,},ignoreindexTrue)print(爬取完成)ifnamemain:starttime。perfcounter()dfpd。DataFrame(columns〔时间,评分,标题〕)urllist〔https:movie。douban。comsubject1652587reviews?sorttimestart0,https:movie。douban。comsubject1652587reviews?sorttimestart20,https:movie。douban。comsubject1652587reviews?sorttimestart40,https:movie。douban。comsubject1652587reviews?sorttimestart60,https:movie。douban。comsubject1652587reviews?sorttimestart80,https:movie。douban。comsubject1652587reviews?sorttimestart100,https:movie。douban。comsubject1652587reviews?sorttimestart120,https:movie。douban。comsubject1652587reviews?sorttimestart140,https:movie。douban。comsubject1652587reviews?sorttimestart160,https:movie。douban。comsubject1652587reviews?sorttimestart180〕threadlist〔〕foriinurllist:threadthreading。Thread(targetcrawurl,args〔i〕)thread。start()threadlist。append(thread)fortinthreadlist:t。join()finishtime。perfcounter()print(f全部任务执行完成,耗时{round(finishstart,2)}秒) 执行这段代码,差不多仅用了1秒就将全部内容爬取并存储到dataframe中,比同步的代码块了近十倍!如果感兴趣的话可以自己尝试一下。 至此,有关Python多线程模块threading的基本用法与需要注意的几点就介绍完毕,如果全部认真看完的话,我相信你一定能照猫画虎写出第一个多线程爬虫脚本。 当然有关Python多线程还有很多饱受诟病的争议(GIL),多线程的实现方法也远不止threading模块,例如更常见的写法是通过concurrent。futures模块以及多进程、协程,这些都留在本系列后续文章中再进一步讨论!