ConcurrentHashMap是concurrent并发包下重要的工具类,它的设计和实现非常的巧妙,它将数据分段,每段数据使用一个AQS锁,从而减小了锁的粒度。1。ConcurrentHashMap的结构 结构图 一个ConcurrentHashMap是由多个Segment(段)组成的。Segment类继承了ReentrantLock类,这意味着每个Segment拥有了一个AQS,多个线程的操作落到同一个Segment上时,发生了锁的竞争。ConcurrentHashMap默认有16个Segment,在初始化之后,Segment个数不可修改。 一个Segment包含了一个HashEntry的数组,每个HashEntry都是一个单向链表,HashEntry的数组可以扩容,扩容后数组的长度为原来的2倍。HashEntry类如下图所示: 我们看到value和next都是volatile修饰的,这保证了数据的可见性。2。put方法详解publicVput(Kkey,Vvalue){SegmentK,Vs;if(valuenull)thrownewNullPointerException();计算key的hash值inthashhash(key);hash为32位无符号数,segmentShift默认为28,向右移28位,剩下高4位然后和segmentMask(默认值为15)做与运算,结果还是高4位intj(hashsegmentShift)segmentMask;if((s(SegmentK,V)UNSAFE。getObjectnonvolatile;recheck(segments,(jSSHIFT)SBASE))null)inensureSegment对segment〔j〕进行初始化sensureSegment(j);放入到对应的段内returns。put(key,hash,value,false);} 第一步是根据hash值快速获取到相应的Segment,第二步就是Segment内部的put操作了。finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){获取该segment的独占锁HashEntryK,VnodetryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{HashEntryK,V〔〕tabtable;用hash值和(数组长度1)做与运算,得出数组的下标intindex(tab。length1)hash;first是数组该位置处的链表的表头HashEntryK,VfirstentryAt(tab,index);for(HashEntryK,Vefirst;;){判断是不是到了尾部,尾部nullif(e!null){Kk;key相同,值更新if((ke。key)key(e。hashhashkey。equals(k))){oldValuee。value;if(!onlyIfAbsent){e。valuevalue;modCount;}break;}继续下一个节点ee。next;}else{node不为空,则node作为头节点,使用的是头插法if(node!null)node。setNext(first);elsenodenewHashEntryK,V(hash,key,value,first);intccount1;如果超过了该segment的阈值,这个segment需要扩容if(cthresholdtab。lengthMAXIMUMCAPACITY)rehash(node);else没有达到阈值,将node放到数组tab的index位置,其实就是将新的节点设置成原链表的表头setEntryAt(tab,index,node);modCount;countc;oldValuenull;break;}}}finally{unlock();}returnoldValue;} 如何进行扩容:privatevoidrehash(HashEntryK,Vnode){HashEntryK,V〔〕oldTabletable;intoldCapacityoldTable。length;扩容为原来的2倍intnewCapacityoldCapacity1;计算阀值threshold(int)(newCapacityloadFactor);HashEntryK,V〔〕newTable(HashEntryK,V〔〕)newHashEntry〔newCapacity〕;intsizeMasknewCapacity1;循环旧的HashEntry数组for(inti0;ioldCapacity;i){HashEntryK,VeoldTable〔i〕;if(e!null){HashEntryK,Vnexte。next;intidxe。hashsizeMask;该链表上只有一个节点if(nextnull)SinglenodeonlistnewTable〔idx〕e;else{ReuseconsecutivesequenceatsameslotHashEntryK,VlastRune;intlastIdxidx;for(HashEntryK,Vlastnext;last!null;lastlast。next){计算在新数组中的下标intklast。hashsizeMask;当前节点的下标和上一个节点的下标不一致时,修改最终节点值注意如果后面的节点和前面的节点下标一致,那么后面的节点保持原有的顺序,直接带到新tab〔k〕的链表中if(k!lastIdx){lastIdxk;lastRunlast;}}采用头插法,最后一个节点作为头节点newTable〔lastIdx〕lastRun;重新计算节点在数组中的位置,采用头插法插入到链表for(HashEntryK,Vpe;p!lastRun;pp。next){Vvp。value;inthp。hash;intkhsizeMask;HashEntryK,VnnewTable〔k〕;newTable〔k〕newHashEntryK,V(h,p。key,v,n);}}}}添加新节点intnodeIndexnode。hashsizeMask;addthenewnodenode。setNext(newTable〔nodeIndex〕);newTable〔nodeIndex〕node;将newTable赋值给该segment的tabletablenewTable;} 自旋获取aqs锁:privateHashEntryK,VscanAndLockForPut(Kkey,inthash,Vvalue){HashEntryK,VfirstentryForHash(this,hash);HashEntryK,Vefirst;HashEntryK,Vnodenull;intretries1;negativewhilelocatingnode如果尝试加锁失败,那么就对segment〔hash〕对应的链表进行遍历找到需要put的这个entry所在的链表中的位置,这里之所以进行一次遍历找到坑位,主要是为了通过遍历过程将遍历过的entry全部放到CPU高速缓存中,这样在获取到锁了之后,再次进行定位的时候速度会十分快,这是在线程无法获取到锁前并等待的过程中的一种预热方式。while(!tryLock()){HashEntryK,Vf;torecheckfirstbelow获取锁失败,初始时retries1必然开始先进入第一个ifif(retries0){enull代表两种意思,1。第一种就是遍历链表到了最后,仍然没有发现指定key的entry;2。第二种情况是刚开始时entryForHash(通过hash找到的table中对应位置链表的结点)找到的HashEntry就是空的if(enull){当然这里之所以还需要对nodenull进行判断,是因为有可能在第一次给node赋值完毕后,然后预热准备工作已经搞定,然后进行循环尝试获取锁,在循环次数还未达到264次以前,某一次在条件3判断时发现有其它线程对这个segment进行了修改,那么retries被重置为1,从而再一次进入到1条件内,此时如果再次遍历到链表最后时,因为上一次遍历时已经给node赋值过了,所以这里判断node是否为空,从而避免第二次创建对象给node重复赋值。if(nodenull)speculativelycreatenodenodenewHashEntryK,V(hash,key,value,null);retries0;}elseif(key。equals(e。key))retries0;elseee。next;}elseif(retriesMAXSCANRETRIES){尝试获取锁次数超过设置的最大值,直接进入阻塞等待,这就是所谓的有限制的自旋获取锁,之所以这样是因为如果持有锁的线程要过很久才释放锁,这期间如果一直无限制的自旋其实是对cpu性能有消耗的,这样无限制的自旋是不利的,所以加入最大自旋次数,超过这个次数则进入阻塞状态等待对方释放锁并获取锁lock();break;}遍历过程中,有可能其它线程改变了遍历的链表,这时就需要重新进行遍历了。判断是否初始化了结点并且判断链表头结点是否改变(1。7使用头插法)elseif((retries1)0(fentryForHash(this,hash))!first){efirstf;retraverseifentrychangedretries1;}}returnnode;} 这个方法用了一个while循环去获取aqs锁,有两种情况需要说明下: 1。如果尝试的次数超过了最大自旋次数,会进入到aqs的等待队列,避免了cpu的空转。 2。如果在循环的过程中,其他的线程获取到了锁,并且改变了遍历的链表,那么自旋计数器重置为1,从链表的头节点重新开始遍历。3。get方法publicVget(Objectkey){SegmentK,Vs;manuallyintegrateaccessmethodstoreduceoverheadHashEntryK,V〔〕tab;inthhash(key);longu(((hsegmentShift)segmentMask)SSHIFT)SBASE;if((s(SegmentK,V)UNSAFE。getObjectVolatile(segments,u))!null(tabs。table)!null){for(HashEntryK,Ve(HashEntryK,V)UNSAFE。getObjectVolatile(tab,((long)(((tab。length1)h))TSHIFT)TBASE);e!null;ee。next){Kk;if((ke。key)key(e。hashhkey。equals(k)))returne。value;}}returnnull;} get方法并没有加锁,基本思路是: 1。先定位到所在的segment 2。定位对应的segment的tab数组内的位置 3。然后遍历链表元素,如果找到相同的key就返回对应的value4。总结: ConcurrentHashMap1。7采用了分而治之的思想,将数据分段,每个段持有一把aqs锁。 它的写操作都是需要获取锁之后再操作,而读操作不需要获取锁,这也说明它适合读多写少的业务场景。 线程在获取不到aqs锁的情况下,不会立即进入到等待队列,会进行一定次数的自旋。