今天从源码的角度分析下进程间通信之管道(pipe)。 什么是管道? 所谓管道,是指用于连接一个读进程和一个写进程,以实现它们之间通信的共享文件,又称pipe文件。 向管道(共享文件)提供输入的发送进程(即写进程),以字符流形式将大量的数据送入管道;而接收管道输出的接收进程(即读进程),可从管道中接收数据。由于发送进程和接收进程是利用管道进行通信的,故又称管道通信。 为了协调双方的通信,管道通信机制必须提供以下3方面的协调能力。互斥。当一个进程正在对pipe进行读写操作时,另一个进程必须等待。同步。当写(输入)进程把一定数量(如4KB)数据写入pipe后,便去睡眠等待,直到读(输出)进程取走数据后,再把它唤醒。当读进程读到一空pipe时,也应睡眠等待,直至写进程将数据写入管道后,才将它唤醒。对方是否存在。只有确定对方已存在时,才能进行通信。 管道的应用 管道是利用pipe()系统调用而不是利用open()系统调用建立的。pipe()调用的原型是:intpipe(intfd〔2〕) 我们看到,有两个文件描述符与管道结合在一起,一个文件描述符用于管道的read()端,一个文件描述符用于管道的write()端。 由于一个函数调用不能返回两个值,pipe()的参数是指向两个元素的整型数组的指针,它将由调用两个所要求的文件描述符填入。 fd〔0〕元素将含有管道read()端的文件描述符,而fd〔1〕含有管道write()端的文件描述符。系统可根据fd〔0〕和fd〔1〕分别找到对应的file结构。 注意,在pipe的参数中,没有路径名,这表明,创建管道并不像创建文件一样,要为它创建一个目录连接。这样做的好处是,其他现存的进程无法得到该管道的文件描述符,从而不能访问它。 那么,两个进程如何使用一个管道来通信呢? 我们知道,fork()和exec()系统调用可以保证文件描述符的复制品既可供双亲进程使用,也可供它的子女进程使用。也就是说,一个进程用pipe()系统调用创建管道,然后用fork()调用创建一个或多个进程,那么,管道的文件描述符将可供所有这些进程使用。 这里更明确的含义是:一个普通的管道仅可供具有共同祖先的两个进程之间共享,并且这个祖先必须已经建立了供它们使用的管道。 注意,在管道中的数据始终以和写数据相同的次序来进行读,这表示lseek()系统调用对管道不起作用。 下面给出在两个进程之间设置和使用管道的简单程序: includestdio。hincludeunistd。hincludesystypes。hintmain(void){intfd〔2〕,charstring〔〕Hello,world!;charreadbuffer〔80〕;pipe(fd);if((childpidfork())1){printf(Error:fork);exit(1);}if(childpid0)子进程是管道的读进程{close(fd〔1〕);关闭管道的写端nbytesread(fd〔0〕,readbuffer,sizeof(readbuffer));printf(Receivedstring:s,readbuffer);exit(0);}else父进程是管道的写进程{close(fd〔0〕);关闭管道的读端write(fd〔1〕,string,strlen(string));}return(0);} 注意,在这个例子中,为什么这两个进程都关闭它所不需的管道端呢? 这是因为写进程完全关闭管道端时,文件结束的条件被正确地传递给读进程。而读进程完全关闭管道端时,写进程无需等待继续写数据。 阻塞读和写分别成为对空和满管道的默认操作,这些默认操作也可以改变,这就需要调用fcntl()系统调用,对管道文件描述符设置ONONBLOCK标志可以忽略默认操作:includefcntl。hfcntl(fd,FSETFL,ONONBlOCK); 上述例子如下图: 管道的源码分析 pipefs初始化 pipefs是一种简单的、虚拟的文件系统类型,因为它没有对应的物理设备,因此其安装时不需要块设备。大部分文件系统是以模块的形式来实现的。该文件系统相关的代码在fspipe。c中: staticstructfilesystemtypepipefstype{。namepipefs,。getsbpipefsgetsb,。killsbkillanonsuper,};staticintinitinitpipefs(void){pipefstype链接到filesystems链表可以通过读procfilesystems找到pipefs入口点,在那里,nodev标志表示没有设置FSREQUIRESDEV标志,即该文件系统没有对应的物理设备。interrregisterfilesystem(pipefstype);if(!err){安装pipefs文件系统pipemntkernmount(pipefstype);if(ISERR(pipemnt)){errPTRERR(pipemnt);unregisterfilesystem(pipefstype);}}}pipefs文件系统是作为一个模块来安装的fsinitcall(initpipefs);moduleexit(exitpipefs);模块卸载函数 上述就是初始化时注册pipefs文件系统的过程,操作如下:把pipefstype链接到filesystems链表中。创建一个structvfsmount结构,然后把该结构赋值给全局变量pipemnt。该vfsmount结构通过superblock与pipefs文件系统进行了关联。 pipe的创建 pipefs文件系统的入口点就是pipe()系统调用,其内核实现函数为syspipe(),而真正的工作是调用dopipe()函数来完成的,其代码在fspipe。c中:intdopipe(intfd){structfilefw,intfdw,创建管道写端的file结构fwcreatewritepipe();在写端的file结构基础上构建读端frcreatereadpipe(fw);创建读端fdfdrgetunusedfd();创建写端fdfdwgetunusedfd();fd和file进行关联fdinstall(fdr,fr);fdinstall(fdw,fw);返回读写端fdfd〔0〕fd〔1〕。。。return0;}structfilecreatewritepipe(void){。。。structqstrname{。name};创建file结构fgetemptyfilp();创建一个pipe相关的inodeinodegetpipeinode();创建一个dentry结构dentrydalloc(pipemntmntsbsroot,name);inode和dentry相关联dinstantiate(dentry,inode);pipe和pipemnt关联ffpath。mntmntget(pipemnt);file与dentry相关联ffpath。该file是只写的ffflagsOWRONLY;该pipe的可操作方法。。。}structfilecreatereadpipe(structfilewrf){创建一个file结构用于读structfilefgetemptyfilp();file与已有的dentry、inode、structvfsmount相关联ffpath。mntmntget(wrffpath。mnt);ffpath。dentrydget(wrffpath。dentry);ffmappingwrffpath。该file是只读的ffflagsORDONLY;ffmodeFMODEREAD;ffversion0;} dopipe()的操作也很简单,操作如下:创建管道的读写端关联的file、inode、dentry结构。把fd和file进行联系,并返回给用户关于管道的读写描述符。创建一个pipeinodeinfo对象,该结构指向具体的数据页内存缓冲区。后续所有向管道的读写均操作于该数据页内存缓冲区中。 上述pipe的初始化中,创建的pipeinodeinfo对象记录了pipe读写过程中所有的得数据。pipeinodeinfo对象的结构如下structpipeinodeinfo{存储等待读写进程的等待队列nrbufs:写入但还未被读取的数据占用缓冲区的页数curbuf:当前正在读取环形缓冲区中的页节点unsignedintnrbufs,临时缓冲区页面正在读取pipe的读进程数目正在写pipe的写进程数目等待管道可以写的进程数目。。。pipe对应的inode结构structpipebufferbufs〔PIPEBUFFERS〕;环形缓冲区,每个元素对应一个内存页}; 经过上述的一系列初始化,整个管道的内存结构如下图所示 piperead staticssizetpiperead(structkiocbiocb,conststructioveciov,unsignedlongnrsegs,lofftpos){。。。要读的数据长度totalleniovlength(iov,nrsegs);dowakeup0;ret0;读之前先加锁mutexlock(inodeimutex);循环读数据for(;;){if(bufs){待读的数据比要读的数据多,则设置要读的长度if(charstotallen)erroropsconfirm(pipe,buf);if(error){if(!ret)}atomic!iovfaultinpageswrite(iov,chars);redo:把数据拷贝到用户缓冲区addropsmap(pipe,buf,atomic);errorpipeiovcopytouser(iov,addrbufoffset,chars,atomic);opsunmap(pipe,buf,addr);if(unlikely(error)){。。。}该页的数据全部读完,释放该pageif(!buflen){bufopsNULL;opsrelease(pipe,buf);curbuf(curbuf1)(PIPEBUFFERS1);dowakeup1;}if(!totallen)commonpath:readsucceeded}if(bufs)若该页没读完,继续循环读,若该page读完,则读下一个pageif(bufs)Moretodo?if(!pipewriters)if(!pipewaitingwriters){if(ret)非阻塞跳出循环,不进行休眠if(filpfflagsONONBLOCK){retEAGAIN;}}if(signalpending(current)){if(!ret)retERESTARTSYS;}唤醒写进程if(dowakeup){wakeupinterruptiblesync(pipewait);killfasync(pipefasyncwriters,SIGIO,POLLOUT);}让出cpu,进行休眠,等待条件唤醒pipewait(pipe);}formutexunlock(inodeimutex);if(dowakeup){wakeupinterruptiblesync(pipewait);killfasync(pipefasyncwriters,SIGIO,POLLOUT);}if(ret0)fileaccessed(filp);} 读的操作很简单,操作如下:读之前先锁定内存。从缓冲区中获取数据页,把页中数据拷贝到用户缓冲区中。数据全部被读完,则发送信号唤醒写进程,同时读进程让出CPU进行休眠。 读取数据的过程如下 pipewrite staticssizetpipewrite(structkiocbiocb,conststructioveciov,unsignedlongnrsegs,lofftppos){。。。需要写入数据的长度totalleniovlength(iov,nrsegs);dowakeup0;mutexlock(inodeimutex);管道读端已关闭,返回SIGPIPE信号if(!pipereaders){sendsig(SIGPIPE,current,0);retEPIPE;}Wetrytomergesmallwritescurbuf:当前的pipe缓冲节点nrbufs:非空的pipe缓冲节点数目buffers:buf缓冲区总数目bufoffset:页内可用数据的偏移量buflen:可用数据的长度bufoffsetbuflen:页内可以往有效数据后追加数据的下标获取完整页之外的数量charstotallen(PAGESIZE1);sizeofthelastbufferif(pipenrbufschars!0){获取下一个可用的缓冲区,比如缓冲区是05,有效数据起始buff是3,有效数据是4,那么存储buff数据依次为3,4,5,0,下一个可用的buff为1((341)5)intlastbuf(pipecurbufpipenrbufs1)(PIPEBUFFERS1);获取下一个可用的buff,lastbuf为获取到的索引offset:page中数据的偏移量。len:page中数据的长度目前数据在page中的有效起始地址有效数据长度下一个可存放数据的地址。管道是从前往后读的,并没规定读写大小,有可能只读取了page的前一部分,中间部分尚未读取,但是写的时候必须从中间有效数据后继续写当前需要写入的数据已有的数据若没有超过PAGESIZE大小,则拷贝到page中if(opscanmergeoffsetcharsPAGESIZE){。。。redo1:addropsmap(pipe,buf,atomic);将用户数据拷贝到page中errorpipeiovcopyfromuser(offsetaddr,iov,chars,atomic);opsunmap(pipe,buf,addr);dowakeup1;if(error){。。。}更新有效数据全拷贝完则跳出if(!totallen)}}for(;;){if(!pipereaders){。。。}获取管道还有多少有效的buffer缓冲区if(bufsPIPEBUFFERS){有效的bufs小于缓冲区总数intnewbuf(pipecurbufbufs)(PIPEBUFFERS1);获取下一个可用的if(!page){pageallocpage(GFPHIGHUSER);if(unlikely(!page)){。。。}}dowakeup1;charsPAGESIZE;if(charstotallen)iovfaultinpagesread(iov,chars);redo2:errorpipeiovcopyfromuser(src,iov,chars,atomic);更新有效数据bufoffset0;pipetmppageNULL;数据写完,跳出循环结束if(!totallen)}还有可用的缓冲区,继续写if(bufsPIPEBUFFERS)缓冲区全部写完了,则判断是否需要阻塞休眠if(filpfflagsONONBLOCK){if(!ret)retEAGAIN;}if(signalpending(current)){if(!ret)retERESTARTSYS;}唤醒读进程if(dowakeup){wakeupinterruptiblesync(pipewait);killfasync(pipefasyncreaders,SIGIO,POLLIN);dowakeup0;}写进程休眠pipewait(pipe);将当前任务加入到等待列表,释放锁,让出CPU}out:。。。} 管道写操作流程如下:写之前先锁定内存。获取可写的缓冲区,若可写则循环写。若不可写,则唤醒读进程,同时写进程进行休眠,让出CPU。 往管道中写数据的过程如下 管道中最重要的2个方法就是管道的读写。从上述的分析来看,读写进程共同操作内核中的数据缓冲区,若有缓冲区可写,则进程往缓冲区中写,若条件不允许写,则进程休眠让出CPU。读操作同理。 从上述管道读写操作可知,父子进程之所以能够通过pipe进行通信,是因为在内核中共同指向了同一个pipeinodeinfo对象,共同操作同一个内存页。 总结 管道也称无名管道,是UNIX系统中进程间通信(IPC)中的一种。管道由于是无名管道,因此只能在有亲缘关系的进程间使用。管道不是普通的文件,它是基于内存的。管道属于半双工,数据只能从一方流向另一方,也即数据只能从一端写,从另一端读。管道中读数据是一次性的操作,数据读取后就会释放空间,让出空间供更多的数据写。管道写数据遵循先入先出的原则。