ZeroMQ无锁队列的原理与实现
7月7日 壹世缘投稿 前言
本文介绍ZMQ无锁队列的原理与实现
无锁队列用在什么地方?每秒几十万的元素时再考虑使用无锁队列,比如股票行情这种。如果队列里一秒就几千几万的元素,那就不需要使用无锁队列,性能没有太大的提高。
源码:https:github。comgopherWxfcclinuxLearningCodetreemaster3。2。4E697A0E99481E9989FE58897freequeue1。为什么需要无锁队列
锁是解决并发问题的万能钥匙,可是并发问题只有锁能解决吗?锁引起的问题:Cache损坏(Cachetrashing)
线程间频繁切换的时候会导致Cache中数据的丢失,Cache中的数据会失效,因为它缓存的是将被换出任务的数据,这些数据对于新换进的任务是没的。处理器的运速度主存快N倍,所以量的处理器时间被浪费在处理器与主存的数据传输上。这就是在处理器和主存之间引Cache的原因。Cache是种速度更快但容量更的内存(也更加昂贵),当处理器要访问主存中的数据时,这些数据先被拷到Cache中,因为这些数据在不久的将来可能会被处理器访问。Cachemisses对性能有常的影响,因为处理器访问Cache中的数据将直接访问主存快得多。在保存和恢复上下的过程中还隐藏了额外的开销。在同步机制上争抢队列
阻塞不是微不道的操作。它导致操作系统暂停当前的任务或使其进睡眠状态(等待,不占任何的处理器)。直到资源(例如互斥锁)可,被阻塞的任务才可以解除阻塞状态(唤醒)。在个负载较重的应程序中使这样的阻塞队列来在线程之间传递消息会导致严重的争问题。也就是说,任务将量的时间(睡眠,等待,唤醒)浪费在获得保护队列数据的互斥锁,不是处理队列中的数据上。
阻塞机制展伸的机会到了。任务之间不争抢任何资源,在队列中预定个位置,然后在这个位置上插或提取数据。这中机制使了种被称之为CAS(较和交换)的特殊操作,这个特殊操作是种特殊的指令,它可以原的完成以下操作:它需要3个操作数m,A,B,其中m是个内存地址,操作将m指向的内存中的内容与A较,如果相等则将B写到m指向的内存中并返回true,如果不相等则什么也不做返回false。简而言之非阻塞的机制使用了CAS的特殊操作,使得任务之间可以不争抢任何资源,然后在队列中预定的位置上,插入或者提取数据。CAS底层实现多线程动态内存分配malloc性能下降
在多线程系统中,需要仔细的考虑动态内存分配。当个任务从堆中分配内存时,标准的内存分配机制会阻塞所有与这个任务共享地址空间的其它任务(进程中的所有线程)。这样做的原因是让处理更简单,且它作得很好。两个线程不会被分配到块相同的地址的内存,因为它们没办法同时执分配请求。显然线程频繁分配内存会导致应程序性能下降(必须注意,向标准队列或map插数据的时候都会导致堆上的动态内存分配)2。无锁队列的实现(参考zmq,只支持一写一读的场景)2。1无锁队列前言
TODOgit地址补充源码的ypipe。hpp、yqueue。hpp,这些源码可以在程项使,但要注意,这只持单写单读的场景。其中yqueue是用来设计队列,ypipe用来设计队列的写入读取时机、回滚以及flush,首先我们来看yqueue的设计。2。2原子操作函数介绍templatetypenameTclassatomicptrt{public:voidset(Tptr);原操作Txchg(Tval);原操作,设置个新的值,然后返回旧的值Tcas(Tcmp,Tval);原操作private:volatileT};set函数,把私有成员ptr指针设置成参数ptr的值,不是个原操作,需要使者确保执set过程没有其他线程使ptr的值。xchg函数,把私有成员ptr指针设置成参数val的值,并返回ptr设置之前的值。原操作,线程安全。cas函数,原操作,线程安全,把私有成员ptr指针与参数cmp指针较:如果相等返回ptr设置之前的值,并把ptr更新为参数val的值,如果不相等直接返回ptr值。2。3yqueuet的chunk块机制
2。3。1chunk块机制一次分配多个元素
首先我们需要考虑元素的分配,元素存在哪里?yqueue中的数据结构使用的chunk块机制,每次批量分配一批元素,这样可以减少内存的分配和释放yqueuet内部由个个chunk组成,每个chunk保存N个元素:sparechunk,当再次需要分配chunkt的时候从sparechunk中获取。
当队列空间不时每次分配个chunkt,每个chunkt能存储N个元素。在数据出队列后,队列有多余空间的时候,回收的chunk也不是上释放,是根据局部性原理先回收到structchunkt{Tvalues〔N〕;每个chunkt可以容纳N个T类型的元素,以后就以一个chunkt为单位申请内存};
2。3。2chunk块机制局部性原理
程序局部性原理:是指程序在执行时呈现出局部性规律,即在一段时间内,整个程序的执行仅限于程序中的某一部分。相应地,执行所访问的存储空间也局限于某个内存区域,具体来说,局部性通常有两种形式:时间局部性和空间局部性。
时间局部性:被引用过一次的存储器位置在未来会被多次引用(通常在循环中)。
空间局部性:如果一个存储器的位置被引用,那么将来他附近的位置也会被引用。
在yqueuet类中有一个sparechunk用于保存最近的空闲块。也就是说,在将一个chunk中的所有元素都pop掉了,那么我们可以free这个chunk。但是我们可以保存一块最近的空闲块,以后如果chunk不够用时,扩容chunk就不用malloc,直接复用该sparechunk即可。根据局部性原理,这个sparechunk的地址或者内存页很有可能还在cache里,那么这样的机制就可以减少一次malloc以及存入cache的操作。classyqueuetPeoplearelikelytoproduceandconsumeatsimilarrates。Inthisscenarioholdingontothemostrecentlyfreedchunksavesusfromhavingtocallmallocfree。空闲块(把所有元素都已经出队的块称为空闲块),读写线程的共享变量
可以看到在pop的时候,如果删除满一格chunk,就把这个chunk放到sparechunk里。Removesanelementfromthefrontendofthequeue。inlinevoidpop(){if(beginposN)删除满一个chunk才回收chunk{beginchunkprevNULL;beginpos0;ohasbeenmorerecentlyusedthansparechunk,soforcachereasonswellgetridofthespareanduseoasthespare。chunktcssparechunk。xchg(o);由于局部性原理,总是保存最新的空闲块而释放先前的空闲快free(cs);}}
在push的时候,如果chunk满了,说明要发生扩容,那么我们优先从sparechunk取出最近的空闲块当新的chunk来使用Addsanelementtothebackendofthequeue。inlinevoidpush(){if(endpos!N)endpos!N表明这个chunk节点还没有满chunktscsparechunk。xchg(NULL);为什么设置为NULL?因为如果把之前值取出来了则没有sparechunk了,所以设置为NULLif(sc)如果有sparechunk则继续复用它{}else没有则重新分配{staticintscout0;printf(scout:d,scout);endchunknext(chunkt)malloc(sizeof(chunkt));分配一个chunkallocassert(endchunknext);}endpos0;}
相关视频推荐
【C后端开发】C无锁队列设计与实现
高并发场景下,三种锁方案:互斥锁,自旋锁,原子操作的优缺点
学习地址:CCLinux服务器开发后台架构师【零声教育】学习视频教程腾讯课堂
需要CCLinux服务器架构师学习资料加qun812855908获取(资料包括CC,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCPIP,协程,DPDK,ffmpeg等),免费分享
2。4yqueuet成员和接口介绍
yqueuet的作用就是管理元素、管理chunk。chunk和sparechunk上文已经说过了,这里不再赘述。Tisthetypeoftheobjectinthequeue。队列中元素的类型Nisgranularity(粒度)ofthequeue,简单来说就是chunkt个结点可以装载N个T类型的元素templatetypenameT,intNclassyqueuet{public:inlineyqueuet();创建队列。inlineyqueuet();销毁队列。inlineTfront();Returnsreferencetothefrontelementofthequeue。Ifthequeueisempty,behaviourisundefined。inlineTback();Returnsreferencetothebackelementofthequeue。Ifthequeueisempty,behaviourisundefined。inlinevoidpush();Addsanelementtothebackendofthequeue。inlinevoidpop();Removesanelementfromthefrontofthequeue。inlinevoidunpush()Removeselementfromthebackendofthequeue。回滚时使private:InpidualmemorychunktoholdNelements。structchunkt{Tvalues〔N〕;};空闲块(我把所有元素都已经出队的块称为空闲块),读写线程的共享变量};
2。4。1beginbackendchunk与beginbackendpos成员介绍
yqueuet内部有三个chunkt类型指针以及对应的索引位置:beginchunkbeginpos:beginchunk用于指向队列的第一个chunk,beginpos用于指向第一个chunk的第一个元素的索引位置,因为pop(),所以第一个元素不可能永远是0,会随着pop而改变。同理第一个chunk也会被回收,也需要记录第一个chunk的位置。backchunkbackpos:beginchunk用于指向队列的最后一个chunk,backpos用于指向最后一个chunk的最后一个元素的索引位置。endchunkendpos:在最后一个chunk未满的情况下,endchunk和backchunk是相同的,backpos的下一个就是endpos。在最后一个chunk满的情况下,endchunk指向新分配的chunk,endpos0。也就是说endchunk和endpos是辅助backchunkbackpos的,可以理解为探测。
2。4。2函数介绍
frount和pop连用。back和push连用。
构造函数yqueuet
预先分配个chunk。创建队列。inlineyqueuet(){beginchunk(chunkt)malloc(sizeof(chunkt));allocassert(beginchunk);beginpos0;backchunkNULL;backchunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空backpos0;endchunk总是指向链表的最后一个chunkendpos0;}
稀构函数yqueuet
销毁所有的chunk销毁队列。inlineyqueuet(){while(true){if(beginchunkendchunk){free(beginchunk);}free(o);}chunktscsparechunk。xchg(NULL);free(sc);}
front、back函数
这的front()或者back()函数,需要注意的返回的是左值引,我们可以修改其值。
对于先进后出队列:beginchunkvalues〔beginpos〕代表队列头可读元素,读取队列头元素即是读取beginpos位置的元素;backchunkvalues〔backpos〕代表队列尾可写元素,写元素时则是更新backpos位置的元素,要确保元素真正效,还需要调push函数更新backpos的位置,避免下次更新的时候是更新当前backpos位置对应的元素。Returnsreferencetothefrontelementofthequeue。Ifthequeueisempty,behaviourisundefined。返回队列头部元素的引用,调用者可以通过该引用更新元素,结合pop实现出队列操作。inlineTfront()返回的是引用,是个左值,调用者可以通过其修改容器的值{returnbeginchunkvalues〔beginpos〕;}Returnsreferencetothebackelementofthequeue。Ifthequeueisempty,behaviourisundefined。返回队列尾部元素的引用,调用者可以通过该引用更新元素,结合push实现插入操作。如果队列为空,该函数是不允许被调用。inlineTback()返回的是引用,是个左值,调用者可以通过其修改容器的值{returnbackchunkvalues〔backpos〕;}
push函数当endpos!N时,说明当前的chunk还有空余位置可以插入,则不需要扩容当endposN时,说明当前的chunk已经插入满了,下一次插入就要插入到新的chunk了,所以需要发生扩容
需要新分配chunk时,先尝试从sparechunk获取,如果获取到则直接使,如果sparechunk为NULL则需要重新分配chunk。最终都是要更新endchunk和endpos。Addsanelementtothebackendofthequeue。inlinevoidpush(){if(endpos!N)endpos!N表明这个chunk节点还没有满chunktscsparechunk。xchg(NULL);为什么设置为NULL?因为如果把之前值取出来了则没有sparechunk了,所以设置为NULLif(sc)如果有sparechunk则继续复用它{}else没有则重新分配{staticintscout0;printf(scout:d,scout);endchunknext(chunkt)malloc(sizeof(chunkt));分配一个chunkallocassert(endchunknext);}endpos0;}
unpush函数
unpush函数没什么好说的,也是考虑有没有发生扩容的情况,然后分两种情况回退即可。Removeselementfromthebackendofthequeue。Inotherwordsitrollbackslastpushtothequeue。Takecare:Callerisresponsiblefordestroyingtheobjectbeingunpushed。Thecallermustalsoguaranteethatthequeueisntemptywhenunpushiscalled。Itcannotbedoneautomaticallyasthereadsideofthequeuecanbemanagedbydifferent,completelyunsynchronisedthread。必须要保证队列不为空,参考ypipet的uwriteinlinevoidunpush(){First,movebackonepositionbackwards。if(backpos)从尾部删除元素else{backposN1;回退到前一个}Now,moveendpositionbackwards。Notethatobsoleteendchunkisnotusedasasparechunk。Theanalysisshowsthatdoingsowouldrequirefreeandatomicoperationperchunkdeallocatedinsteadofasimplefree。if(endpos)意味着当前的chunk还有其他元素占有else{endposN1;当前chunk没有元素占用,则需要将整个chunk释放free(endchunknext);endchunknextNULL;}}
pop函数beginpos!N,说明当前chunk还有元素没被取出,该chunk还要继续被使;endposN,说明该chunk的所有元素已经被取出,所以该chunk要被回收。把最后回收的chunk保存到sparechunk,然后释放之前sparechunk保存的chunk。
这有两个点需要注意:pop掉的元素,其销毁作交给调者完成,即是pop前调者需要通过front()接读取并进销毁空闲块的保存,要求是原操作。因为闲块是读写线程的共享变量,因为在push中也使了sparechunk。Removesanelementfromthefrontendofthequeue。inlinevoidpop(){if(beginposN)删除满一个chunk才回收chunk{beginchunkprevNULL;beginpos0;ohasbeenmorerecentlyusedthansparechunk,soforcachereasonswellgetridofthespareanduseoasthespare。chunktcssparechunk。xchg(o);由于局部性原理,总是保存最新的空闲块而释放先前的空闲快free(cs);}}2。5ypipeyqueue的封装
yqueue负责元素内存的分配与释放,入队以及出队列;ypipe负责yqueue读写指针的变化。ypipet在yqueuet的基础上构建个单写单读的锁队列templatetypenameT,intNclassypipet{public:Initialisesthepipe。inlineypipet();Thedestructordoesnthavetobevirtual。ItismadvirtualjusttokeepICCandcodecheckingtoolsfromcomplaining。inlinevirtualypipet();Writeanitemtothepipe。Dontflushityet。Ifincompleteissettotruetheitemisassumedtobecontinuedbyitemssubsequentlywrittentothepipe。Incompleteitemsareneverflusheddownthestream。写数据,incomplete参数表示写是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。inlinevoidwrite(constTvalue,boolincomplete);Popanincompleteitemfromthepipe。Returnstrueissuchitemexists,falseotherwise。inlineboolunwrite(Tvalue);Flushallthecompleteditemsintothepipe。Returnsfalseifthereaderthreadissleeping。Inthatcase,callerisobligedtowakethereaderupbeforeusingthepipeagain。刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调者需要唤醒读线程。inlineboolflush();Checkwhetheritemisavailableforreading。这有两个点,个是检查是否有数据可读,个是预取inlineboolcheckread();Readsanitemfromthepipe。Returnsfalseifthereisnovalue。available。inlineboolread(Tvalue);Appliesthefunctionfntothefirstelemenentinthepipeandreturnsthevaluereturnedbythefn。Thepipemustntbeemptyorthefunctioncrashes。inlineboolprobe(bool(fn)(T));protected:Allocationefficientqueuetostorepipeitems。Frontofthequeuepointstothefirstprefetcheditem,backofthepipepointstolastunflusheditem。Frontisusedonlybyreaderthread,whilebackisusedonlybywriterthread。yqueuetT,NPointstothefirstunflusheditem。Thisvariableisusedexclusivelybywriterthread。Tw;指向第个未刷新的元素,只被写线程使Pointstothefirstunprefetcheditem。Thisvariableisusedexclusivelybyreaderthread。Tr;指向第个还没预提取的元素,只被读线程使Pointstothefirstitemtobeflushedinthefuture。Tf;指向下轮要被刷新的批元素中的第个Thesinglepointofcontentionbetweenwriterandreaderthread。Pointspastthelastflusheditem。IfitisNULL,readerisasleep。Thispointershouldbealwaysaccessedusingatomicoperations。atomicptrtTc;读写线程共享的指针,指向每轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)Disablecopyingofypipeobject。ypipet(constypipet);constypipetoperator(constypipet);};
2。5。1如何写入和读取
这一节的目的是找出改变wrfc这四个指针的的函数,至于函数的具体作用会放下下面写。
写入可以单独写,也可以批量写,先来看看write函数。可以看到如果incompletetrue,则说明在批量写,直到incompletefalse时,进行写提交。Writeanitemtothepipe。Dontflushityet。Ifincompleteissettotruetheitemisassumedtobecontinuedbyitemssubsequentlywrittentothepipe。Incompleteitemsareneverflusheddownthestream。写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。inlinevoidwrite(constTvalue,boolincomplete){Placethevaluetothequeue,addnewterminatorelement。queue。back()queue。push();Movetheflushuptoherepoiter。if(!incomplete){fqueue。back();记录要刷新的位置}}
1。单次写yquque。write(count,false);yquque。flush();2。批量写yquque。write(count,true);yquque。write(count,true);yquque。write(count,false);yquque。flush();
上面两种方式最后都用到了flush,下面来看看flush。Flushallthecompleteditemsintothepipe。Returnsfalseifthereaderthreadissleeping。Inthatcase,callerisobligedtowakethereaderupbeforeusingthepipeagain。刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。批量刷新的机制,写入批量后唤醒读线程;反悔机制unwriteinlineboolflush(){Iftherearenounflusheditems,donothing。if(wf)不需要刷新,即是还没有新元素加入Trytosetctof。read时如果没有数据可以读取则c的值会被置为NULLif(c。cas(w,f)!w)尝试将c设置为f,即是准备更新w的位置{CompareandswapwasunseccessfulbecausecisNULL。Thismeansthatthereaderisasleep。Thereforewedontcareaboutthreadsafenessandupdatecinnonatomicmanner。Wellreturnfalsetoletthecallerknowthatreaderissleeping。c。set(f);更新为新的f位置线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理}else读端还有数据可读取{Readerisalive。Nothingspecialtodonow。Justmovethefirstunflusheditempointertof。更新f的位置}}
写入分析完了,来看看如何读。Checkwhetheritemisavailableforreading。这里面有两个点,一个是检查是否有数据可读,一个是预取inlineboolcheckread(){Wasthevalueprefetchedalready?Ifso,return。if(queue。front()!rr)判断是否在前几次调用read函数时已经预取数据了Theresnoprefetchedvalue,soletusprefetchmorevalues。Prefetchingistosimplyretrievethepointerfromcinatomicfashion。Iftherearenoitemstoprefetch,setctoNULL(usingcompareandswap)。两种情况1。如果c值和queue。front(),返回c值并将c值置为NULL,此时没有数据可读2。如果c值和queue。front(),返回c值,此时可能有数据度的去rc。cas(queue。front(),NULL);尝试预取数据Iftherearenoelementsprefetched,exit。DuringpipeslifetimershouldneverbeNULL,however,itcanhappenduringpipeshutdownwhenitemsarebeingdeallocated。if(queue。front()r!r)判断是否成功预取数据Therewasatleastonevalueprefetched。}Readsanitemfromthepipe。Returnsfalseifthereisnovalue。available。inlineboolread(Tvalue){Trytoprefetchavalue。if(!checkread())Therewasatleastonevalueprefetched。Returnittothecaller。valuequeue。front();queue。pop();}
下面来多分析一下,如果read返回false,那么我们应该怎么做?读失败意味着管道内没有可读的数据,所以我们可以休眠,可以让出cpu,也可以条件等待。
这里最正确的做法是用条件等待。上面的flush返回false代表着读端在等待,那么flush返回false后我们应该通知读端。读端if(yqueue。read(value)){数据处理}else{usleep(100);std::uniquelockstd::mutexlock(ypipemutex);ypipecond。wait(lock);schedyield();}写端yqueue。write(count,false);if(!yqueue。flush()){printf(notifyone);std::uniquelockstd::mutexlock(ypipemutex);ypipecond。notifyone();}
其实我们初略的观察这些函数,就能发现,这几个函数改变的是w,r,f,c这四个指针,下面来看看这四个指针的具体作用吧。
2。5。2w,r,f,c图文结合详解(重点理解)
这里这几个变量非常抽象,要结合着函数来讲Tf:指向下一轮要被刷新的一批元素的第一个。Tw:指向第一个未刷新的元素,只被写线程使用;Tr:指向第一个没有被预提取的元素,只被读线程使用;atomicptrtc:读写线程共享的指针,指向每轮刷新的起点。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)write():写数据,incomplete参数表示写是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。完成后会将fqueue。back();unwrite():在数据没有flush之前可以运反悔Popanincompleteitemfromthepipe。Returnstrueissuchitemexists,falseotherwise。boolflush():将write的元素真正刷新到队列,使读端可以访问对应的数据。返回false意味着读线程在休眠,在这种情况下调者需要唤醒读线程。如果读端阻塞,则否则boolcheckread():检测是否有数据可读,如果cqueue。front则cNULL,否则rcboolread(Tvalue):读数据,将读出的数据写value指针中,返回false意味着没有数据可读
这样写感觉还是非常抽象,下面结合着函数和图来讲这些函数与四个变量的关系吧。
构造函数ypipet()
在构造函数里面,下一轮要被刷新的元素的第一个(f),必然是第一个位置;第一个未刷新的元素(w),也是第一个位置;第一个没有被预读取的元素(r),也是第一个位置;每一轮刷新的起点,也是第一个位置(c);
inlineypipet(){Insertterminatorelementintothequeue。queue。push();yqueuet的尾指针加1,开始backchunk为空,现在backchunk指向第一个chunkt块的第一个位置Letallthepointerstopointtotheterminator。(unlesspipeisdead,inwhichcasecissettoNULL)。rwfqueue。back();就是让r、w、f、c四个指针都指向这个end迭代器c。set(queue。back());}
写入函数write(constTvalue,boolincomplete)
第二个参数决定是否要刷新一批元素,false时,刷新一批元素,那么下一轮要被刷新的元素的第一个(f)就要改变了。
cppWriteanitemtothepipe。Dontflushityet。Ifincompleteissettotruetheitemisassumedtobecontinuedbyitemssubsequentlywrittentothepipe。Incompleteitemsareneverflusheddownthestream。写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。inlinevoidwrite(constTvalue,boolincomplete){Placethevaluetothequeue,addnewterminatorelement。queue。back()queue。push();Movetheflushuptoherepoiter。if(!incomplete){fqueue。back();记录要刷新的位置}}
刷新元素使元素对读线程可见boolflush()
还记得c吗?指向每一轮刷新的起点。如果c和w一样,则尝试将c置为f。刷新元素,指向第一个未刷新的元素(w),那么必然wf了。此时前面的元素都可以被读线程可见。
我们来看看什么情况下c!w。
在未更新前队列没有数据可读,没有数据可读的时候,checkread将c的ptr置为NULL。所以会走下面的流程。返回false的的是告诉调者数据读端(接收端)没有数据可读,可能处于休眠的状态,可以结合condition机制,发送个notify唤醒读端继续读取数据。Trytosetctof。read时如果没有数据可以读取则c的值会被置为NULLif(c。cas(w,f)!w)尝试将c设置为f,即是准备更新w的位置{CompareandswapwasunseccessfulbecausecisNULL。Thismeansthatthereaderisasleep。Thereforewedontcareaboutthreadsafenessandupdatecinnonatomicmanner。Wellreturnfalsetoletthecallerknowthatreaderissleeping。c。set(f);更新为新的f位置线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理}
未更新前队列有数据可读,此时只需要更新w即可,但此时c值不去更新。else读端还有数据可读取{Readerisalive。Nothingspecialtodonow。Justmovethefirstunflusheditempointertof。更新f的位置}
从write和flush我们也可以看出来,在更新w和f的时候并没有互斥的保护,所以此程序插数据的时候不适合于多线程场景。
flush函数主要是将w更新到f的位置,说明已经写到的位置。Flushallthecompleteditemsintothepipe。Returnsfalseifthereaderthreadissleeping。Inthatcase,callerisobligedtowakethereaderupbeforeusingthepipeagain。刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。批量刷新的机制,写入批量后唤醒读线程;反悔机制unwriteinlineboolflush(){Iftherearenounflusheditems,donothing。if(wf)不需要刷新,即是还没有新元素加入Trytosetctof。read时如果没有数据可以读取则c的值会被置为NULLif(c。cas(w,f)!w)尝试将c设置为f,即是准备更新w的位置{CompareandswapwasunseccessfulbecausecisNULL。Thismeansthatthereaderisasleep。Thereforewedontcareaboutthreadsafenessandupdatecinnonatomicmanner。Wellreturnfalsetoletthecallerknowthatreaderissleeping。c。set(f);更新为新的f位置线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理}else读端还有数据可读取{Readerisalive。Nothingspecialtodonow。Justmovethefirstunflusheditempointertof。更新f的位置}}
预取读取函数ckeckread()
如果指针r指向的是队头元素(rqueue。front())或者r没有指向任何元素(NULL)则说明队列中并没有可读的数据,这个时候checkread尝试去预取数据。所谓的预取就是令rc(cas函数就是返回c本身的值,看上关于cas的实现),c在write中被指向f(上图),这时从queue。front()到f这个位置的数据都被预取出来了,然后每次调read都能取出段。
值得注意的是,当cqueue。front()时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程检查读线程是否睡眠的标志(c指向NULL)。
继续上写AB数据的场景,第次调read时,会先checkread,把指针r指向指针c的位置(所谓的预取),这时r,c,w,f的关系如下:
为什么要预读取?当front()和r相等时:rc。cas(queue。front(),NULL);执行之前,如果写端没有flush,那么c置为NULL,说明没有数据可读,返回false。rc。cas(queue。front(),NULL);执行之前,如果写端调用flush,那么c就不等于front(),则r返回了新的f值,最终返回true。Checkwhetheritemisavailableforreading。这里面有两个点,一个是检查是否有数据可读,一个是预取inlineboolcheckread(){Wasthevalueprefetchedalready?Ifso,return。if(queue。front()!rr)判断是否在前几次调用read函数时已经预取数据了Theresnoprefetchedvalue,soletusprefetchmorevalues。Prefetchingistosimplyretrievethepointerfromcinatomicfashion。Iftherearenoitemstoprefetch,setctoNULL(usingcompareandswap)。两种情况1。如果c值和queue。front(),返回c值并将c值置为NULL,此时没有数据可读2。如果c值和queue。front(),返回c值,此时可能有数据度的去rc。cas(queue。front(),NULL);尝试预取数据Iftherearenoelementsprefetched,exit。DuringpipeslifetimershouldneverbeNULL,however,itcanhappenduringpipeshutdownwhenitemsarebeingdeallocated。if(queue。front()r!r)判断是否成功预取数据Therewasatleastonevalueprefetched。}Readsanitemfromthepipe。Returnsfalseifthereisnovalue。available。inlineboolread(Tvalue){Trytoprefetchavalue。if(!checkread())Therewasatleastonevalueprefetched。Returnittothecaller。valuequeue。front();queue。pop();}总结
c指针,则是读写线程都可以操作,因此需要使原的CAS操作来修改,它的可能值有以下种:NULL:读线程设置,此时意味着已经没有数据可读,读线程在休眠。零:写线程设置,这区分两种情况:旧值为w的情况下,cas(w,f)操作修改为f,意味着如果原先的值为w,则原性的修改为f,表示有更多已被刷新的数据可读。在旧值为NULL的情况下,此时读线程休眠,因此可以安全的设置为当前f指针的位置。
写端yquque。write(count,false);将fqueue。back();写端yquque。flush();如果cw,则否则读端checkread();如果cqueue。front则cNULL否则r更新为f。3。ZMQ无锁队列1写1读性能测试
这里分三种测试情况:一次写就提交,read失败就usleep10次写才提交,read失败就yieldflush失败就notify,read失败就wait
可以看到用cond是效率是最高的,usleep的情况和yield的情况类似,实时性没有cond高。并且按照道理来说,正确的使用方法也是用cond
下面来看一看互斥锁队列vs互斥锁条件变量队列vs内存屏障链表vsRingBufferCAS实现。可以看到在一个写线程一个读线程的情况下,我们的ZMQ无锁队列是最快的。
那么在一写一读的场景下,我们就优先选用ZMQ无锁队列即可
4。如何实现多写多读的无锁队列?
后续的多写多读的无锁队列由下一篇文章再来介绍。
投诉 评论
黑化病娇类小说有哪些(18本黑化偏执病娇男主小说合集)今天带来了黑化病娇类小说,这种类型的不知道大家喜不喜欢。其实,黑化病娇类小说每一部剧情都非常的好,相信不会让大家喜欢的。1黑化病娇类小说:4本超带感的病娇黑化女主文4本超……
百年经典绝壁凌空一座隐匿深山的世界名桥穿越崇山峻岭的滇越米轨铁路被称为世界上最艰巨的三大工程之一,而最艰难的一段当属飞架于两山绝壁间的人字桥。为了一睹这座百年前在悬崖峭壁上架起的天险之桥,我们不畏道路艰险,来……
具俊晔去哪了汪小菲和大S的离婚售后大战告一段落,而作为现任丈夫的具俊晔自始至终,未发一言,倒是将这件事与自己摘得特别干净,生怕惹上是非,脚底也麻利,说跑就跑,韩国美国的,汪小菲都无法追上。……
微信成网络频繁诈骗工具这到底是怎么回事呢?近日,有报告显示,2016年至2018年网络诈骗犯罪案件已结4。8万余件,并且案件数量在逐年上升,其中,微信网络诈骗成了最频繁的诈骗工具,这到底是怎么回事呢?一起来看一下。……
四姑娘山首次拍到野生雪豹野生雪豹清晰影像曝光近日,在四姑娘山国家级自然保护区首次拍到野生雪豹,野生雪豹清晰影像曝光,一起来看一下现场的图片。四姑娘山首次拍到野生雪豹11月17日,保护区工作人员在清理安置在海子……
贾跃亭和甘薇离婚什么情况贾跃亭离婚原因竟是这样近日,乐视创始人贾跃亭日前正在美国申请个人破产重组,并在10月11日,贾跃亭与妻子甘薇在成都锦江区人面法院申请离婚,男方已支付51万美元,贾跃亭为什么要……
买鹌鹑蛋忘记煮孵出一窝小鹌鹑网友吃蛋是不是就是杀生近日,一则买鹌鹑蛋忘记煮孵出一窝小鹌鹑的消息登上了热搜榜,在网上引起了很多人的关注,这到底是怎么回事呢?不少网友看到这条消息之后,都纷纷疑虑道:吃蛋是不……
75岁卡米拉竟然喜欢穿蕾丝,优雅又灵动,连伞都是精致的蕾丝小关注我,看王室穿搭!卡米拉也算是英国王室的传奇人物了,从人人喊打的过街老鼠,如今成为了高高在上的王后,与查尔斯三世相伴到老。或许是因为有戴安娜王妃的对比,卡米拉的颜……
痔疮的最佳治疗方法痔疮的日常预防很重要痔疮是肛肠科最常见的疾病之一,同时,如果得了痔疮将是一件令人头疼的事,那么痔疮的最佳治疗方法是什么呢?下面整理了一些护理方法技巧,来了解了解。痔疮的最佳治疗方法都说十人九……
顺丰回应私拆包裹具体内容这究竟是个什么梗?针对快递员私拆包裹,把顾客内衣化妆品被摆拍发朋友圈?近日,顺丰回应私拆包裹顺丰这样回应,一起来看看。顺丰回应私拆包裹据悉,23日19时20分顺丰集团再次在微博上就此事发表……
5个神奇的星期六因为有这2个巧合就莫名神奇了不知不觉间2019年就悄然离开了,2020年已经到来,新年新气象,祝大家新年快乐!不过,网曝今年有5个神奇的星期六,这是怎么回事呢?为什么星期六说是神奇的?据悉,2020年有五……
故事连环画兄弟俩。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。……