java集合包LinkedBlockingQueue底层源码
一、demopublicclassLinkedBlockingQueueTest{publicstaticvoidmain(String〔〕args)throwsException{LinkedBlockingQueuelinkedBlockingQueuenewLinkedBlockingQueue(10);linkedBlockingQueue。put(今天头条);System。out。println(linkedBlockingQueue。take());IteratoriteratorlinkedBlockingQueue。iterator();while(iterator。hasNext()){System。out。println(iterator。next());}}}二、构造方法分析
Condition:如果你获取了一把锁,然后调用了Condition。await(),此时会释放锁,当前线程会进入一个cnodition等待队列,事后人家来唤醒你,Cnodition。signal(),这个时候会从condition等待队列中把你加入到wait等待队列里去,然后人家释放锁,就会唤醒wait等待队列里的线程尝试来获取锁;当前队列元素数量privatefinalAtomicIntegercountnewAtomicInteger();Lockheldbytake,poll,etc获取队列元素时加的锁privatefinalReentrantLocktakeLocknewReentrantLock();WaitqueueforwaitingtakesprivatefinalConditionnotEmptytakeLock。newCondition();Lockheldbyput,offer,etc放入元素时加的锁privatefinalReentrantLockputLocknewReentrantLock();WaitqueueforwaitingputsprivatefinalConditionnotFullputLock。newCondition();publicLinkedBlockingQueue(intcapacity){if(capacity0)thrownewIllegalArgumentException();队列最大大小this。capacitycapacity;last和head指针指向空节点lastheadnewNodeE(null);}三、put方法publicvoidput(Ee)throwsInterruptedException{if(enull)thrownewNullPointerException();Note:conventioninallputtakeetcistopresetlocalvarholdingcountnegativetoindicatefailureunlessset。intc1;创建一个单向节点NodeEnodenewNodeE(e);获取添加元素的锁finalReentrantLockputLockthis。putLock;获取队列当前数量finalAtomicIntegercountthis。count;加一个可以被中断的锁,保证同一时间只有一个线程可以添加数据如果加锁线程被中断了,此时加锁会失败,抛出一个线程中断的异常出来putLock。lockInterruptibly();try{Notethatcountisusedinwaitguardeventhoughitisnotprotectedbylock。Thisworksbecausecountcanonlydecreaseatthispoint(allotherputsareshutoutbylock),andwe(orsomeotherwaitingput)aresignalledifiteverchangesfromcapacity。Similarlyforallotherusesofcountinotherwaitguards。判断队列是否已经满了while(count。get()capacity){调用添加元素锁的Condition的await方法进入等待队列进行等待,并且释放锁notFull。await();}设置尾节点的下一个节点为新创建节点将新创建节点设置为尾节点enqueue(node);调用getAndIncrement方法,先获取count数量,在将count队列元素加一ccount。getAndIncrement();判断当前元素小于队列最大数量则唤醒等待线程继续放入元素if(c1capacity)notFull。signal();}finally{释放锁putLock。unlock();}如果队列之前为空的话,则调用signalNotEmpty方法唤醒之前等待消费的线程进行消费if(c0)signalNotEmpty();}staticclassNodeE{Eitem;指向下一个节点NodeEnext;Node(Ex){itemx;}}privatevoidenqueue(NodeEnode){assertputLock。isHeldByCurrentThread();assertlast。nextnull;设置尾节点的下一个节点为新创建节点将新创建节点设置为尾节点lastlast。nextnode;}privatevoidsignalNotEmpty(){finalReentrantLocktakeLockthis。takeLock;takeLock。lock();try{唤醒阻塞的消费线程notEmpty。signal();}finally{takeLock。unlock();}}四、take方法publicEtake()throwsInterruptedException{Ex;intc1;获取队列中元素数量finalAtomicIntegercountthis。count;获取元素出队锁finalReentrantLocktakeLockthis。takeLock;加一个可以被中断的锁,保证同一时间只有一个线程可以添加数据如果加锁线程被中断了,此时加锁会失败,抛出一个线程中断的异常出来takeLock。lockInterruptibly();try{while(count。get()0){notEmpty。await();}获取第一个元素xdequeue();调用getAndIncrement方法,先获取count数量,在将count队列元素加一ccount。getAndDecrement();if(c1)如果当前队列数量大于1则进行唤醒其他等待消费线程notEmpty。signal();}finally{释放锁takeLock。unlock();}判断队列之前是否已经满了,满了说明可能有有线程在等待则调用signalNotFull唤醒等待线程if(ccapacity)signalNotFull();returnx;}privateEdequeue(){asserttakeLock。isHeldByCurrentThread();asserthead。itemnull;NodeEhhead;获取头节点下一个元素,因为头节点一般为空,忽略NodeEfirsth。next;将当前头节点的下一个元素指向自己,后续会gc回收掉h。nexth;helpGC在将下一个元素设置为头节点headfirst;获取数据Exfirst。item;将新的头节点元素设置为空,保证头节点元素一直为空first。itemnull;返回数据returnx;}privatevoidsignalNotFull(){finalReentrantLockputLockthis。putLock;putLock。lock();try{notFull。signal();}finally{putLock。unlock();}}五、iterator方法publicIteratorEiterator(){returnnewItr();}Itr(){将队列中获取元素和添加元素的锁进行加锁fullyLock();try{currenthead。next;if(current!null)currentElementcurrent。item;}finally{对获取元素锁和添加元素锁进行释放fullyUnlock();}}publicEnext(){将队列中获取元素和添加元素的锁进行加锁fullyLock();try{if(currentnull)thrownewNoSuchElementException();ExcurrentElement;lastRetcurrent;获取下一个元素currentnextNode(current);currentElement(currentnull)?null:current。item;返回元素returnx;}finally{对获取元素锁和添加元素锁进行释放fullyUnlock();}}voidfullyLock(){putLock。lock();takeLock。lock();}voidfullyUnlock(){takeLock。unlock();putLock。unlock();}privateNodeEnextNode(NodeEp){for(;;){NodeEsp。next;if(sp)returnhead。next;if(snulls。item!null)returns;ps;}}