概述 以前参加过一个库存系统,由于其业务复杂性,搞了很多个应用来支撑。这样的话一份库存数据就有可能同时有多个应用来修改库存数据。 比如说,有定时任务域xx。cron,和SystemA域和SystemB域这几个JAVA应用,可能同时修改同一份库存数据。如果不做协调的话,就会有脏数据出现。 对于跨JAVA进程的线程协调,可以借助外部环境,例如DB或者Redis。下文介绍一下如何使用DB来实现分布式锁。设计 本文设计的分布式锁的交互方式如下:根据业务字段生成transactionid,并线程安全的创建锁资源根据transactionid申请锁释放锁动态创建锁资源 在使用synchronized关键字的时候,必须指定一个锁对象。synchronized(obj){} 进程内的线程可以基于obj来实现同步。obj在这里可以理解为一个锁对象。如果线程要进入synchronized代码块里,必须先持有obj对象上的锁。这种锁是JAVA里面的内置锁,创建的过程是线程安全的。那么借助DB,如何保证创建锁的过程是线程安全的呢? 可以利用DB中的UNIQUEKEY特性,一旦出现了重复的key,由于UNIQUEKEY的唯一性,会抛出异常的。在JAVA里面,是SQLIntegrityConstraintViolationException异常。createtabledistributedlock(idBIGINTUNSIGNEDPRIMARYKEYAUTOINCREMENTCOMMENT自增主键,transactionidvarchar(128)NOTNULLDEFAULTCOMMENT事务id,lastupdatetimeTIMESTAMPDEFAULTCURRENTTIMESTAMPONUPDATECURRENTTIMESTAMPNOTNULLCOMMENT最后更新时间,createtimeTIMESTAMPDEFAULT0000000000:00:00NOTNULLCOMMENT创建时间,UNIQUEKEYidxtransactionid(transactionid)) transactionid是事务Id,比如说,可以用 仓库条码销售模式 来组装一个transactionid,表示某仓库某销售模式下的某个条码资源。不同条码,当然就有不同的transactionid。如果有两个应用,拿着相同的transactionid来创建锁资源的时候,只能有一个应用创建成功。 一条distributedlock记录插入成功了,就表示一份锁资源创建成功了。DB连接池列表设计 在写操作频繁的业务系统中,通常会进行分库,以降低单数据库写入的压力,并提高写操作的吞吐量。如果使用了分库,那么业务数据自然也都分配到各个数据库上了。 在这种水平切分的多数据库上使用DB分布式锁,可以自定义一个DataSouce列表。并暴露一个getConnection(StringtransactionId)方法,按照transactionId找到对应的Connection。 实现代码如下:packagedlock;importcom。alibaba。druid。pool。DruidDataSource;importorg。springframework。stereotype。Component;importjavax。annotation。PostConstruct;importjava。io。FileInputStream;importjava。io。IOException;importjava。sql。Connection;importjava。util。ArrayList;importjava。util。List;importjava。util。Properties;ComponentpublicclassDataSourcePool{privateListDruidDataSourcedlockDataSourcesnewArrayList();PostConstructprivatevoidinitDataSourceList()throwsIOException{PropertiespropertiesnewProperties();FileInputStreamfisnewFileInputStream(db。properties);properties。load(fis);IntegerlockNumInteger。valueOf(properties。getProperty(DLOCKNUM));for(inti0;ilockNum;i){Stringuserproperties。getProperty(DLOCKUSERi);Stringpasswordproperties。getProperty(DLOCKPASSi);IntegerinitSizeInteger。valueOf(properties。getProperty(DLOCKINITSIZEi));IntegermaxSizeInteger。valueOf(properties。getProperty(DLOCKMAXSIZEi));Stringurlproperties。getProperty(DLOCKURLi);DruidDataSourcedataSourcecreateDataSource(user,password,initSize,maxSize,url);dlockDataSources。add(dataSource);}}privateDruidDataSourcecreateDataSource(Stringuser,Stringpassword,IntegerinitSize,IntegermaxSize,Stringurl){DruidDataSourcedataSourcenewDruidDataSource();dataSource。setDriverClassName(com。mysql。jdbc。Driver);dataSource。setUsername(user);dataSource。setPassword(password);dataSource。setUrl(url);dataSource。setInitialSize(initSize);dataSource。setMaxActive(maxSize);returndataSource;}publicConnectiongetConnection(StringtransactionId)throwsException{if(dlockDataSources。size()0){returnnull;}if(transactionIdnull。equals(transactionId)){thrownewRuntimeException(transactionId是必须的);}inthascodetransactionId。hashCode();if(hascode0){hascodehascode;}returndlockDataSources。get(hascodedlockDataSources。size())。getConnection();}} 首先编写一个initDataSourceList方法,并利用Spring的PostConstruct注解初始化一个DataSource列表。相关的DB配置从db。properties读取。DLOCKNUM2DLOCKUSER0user1DLOCKPASS0pass1DLOCKINITSIZE02DLOCKMAXSIZE010DLOCKURL0jdbc:mysql:localhost:3306test1DLOCKUSER1user1DLOCKPASS1pass1DLOCKINITSIZE12DLOCKMAXSIZE110DLOCKURL1jdbc:mysql:localhost:3306test2 DataSource使用阿里的DruidDataSource。 接着最重要的一个实现getConnection(StringtransactionId)方法。实现原理很简单,获取transactionId的hashcode,并对DataSource的长度取模即可。 连接池列表设计好后,就可以实现往distributedlock表插入数据了。packagedlock;importorg。springframework。beans。factory。annotation。Autowired;importorg。springframework。stereotype。Component;importjava。sql。;ComponentpublicclassDistributedLock{AutowiredprivateDataSourcePooldataSourcePool;根据transactionId创建锁资源publicStringcreateLock(StringtransactionId)throwsException{if(transactionIdnull){thrownewRuntimeException(transactionId是必须的);}Connectionconnectionnull;Statementstatementnull;try{connectiondataSourcePool。getConnection(transactionId);connection。setAutoCommit(false);statementconnection。createStatement();statement。executeUpdate(INSERTINTOdistributedlock(transactionid)VALUES(transactionId));connection。commit();returntransactionId;}catch(SQLIntegrityConstraintViolationExceptionicv){说明已经生成过了。if(connection!null){connection。rollback();}returntransactionId;}catch(Exceptione){if(connection!null){connection。rollback();}throwe;}finally{if(statement!null){statement。close();}if(connection!null){connection。close();}}}}根据transactionId锁住线程 接下来利用DB的selectforupdate特性来锁住线程。当多个线程根据相同的transactionId并发同时操作selectforupdate的时候,只有一个线程能成功,其他线程都block住,直到selectforupdate成功的线程使用commit操作后,block住的所有线程的其中一个线程才能开始干活。 我们在上面的DistributedLock类中创建一个lock方法。publicbooleanlock(StringtransactionId)throwsException{Connectionconnectionnull;PreparedStatementpreparedStatementnull;ResultSetresultSetnull;try{connectiondataSourcePool。getConnection(transactionId);preparedStatementconnection。prepareStatement(SELECTFROMdistributedlockWHEREtransactionid?FORUPDATE);preparedStatement。setString(1,transactionId);resultSetpreparedStatement。executeQuery();if(!resultSet。next()){connection。rollback();returnfalse;}returntrue;}catch(Exceptione){if(connection!null){connection。rollback();}throwe;}finally{if(preparedStatement!null){preparedStatement。close();}if(resultSet!null){resultSet。close();}if(connection!null){connection。close();}}}实现解锁操作 当线程执行完任务后,必须手动的执行解锁操作,之前被锁住的线程才能继续干活。在我们上面的实现中,其实就是获取到当时selectforupdate成功的线程对应的Connection,并实行commit操作即可。 那么如何获取到呢?我们可以利用ThreadLocal。首先在DistributedLock类中定义privateThreadLocalConnectionthreadLocalConnnewThreadLocal(); 每次调用lock方法的时候,把Connection放置到ThreadLocal里面。我们修改lock方法。publicbooleanlock(StringtransactionId)throwsException{Connectionconnectionnull;PreparedStatementpreparedStatementnull;ResultSetresultSetnull;try{connectiondataSourcePool。getConnection(transactionId);threadLocalConn。set(connection);preparedStatementconnection。prepareStatement(SELECTFROMdistributedlockWHEREtransactionid?FORUPDATE);preparedStatement。setString(1,transactionId);resultSetpreparedStatement。executeQuery();if(!resultSet。next()){connection。rollback();threadLocalConn。remove();returnfalse;}returntrue;}catch(Exceptione){if(connection!null){connection。rollback();threadLocalConn。remove();}throwe;}finally{if(preparedStatement!null){preparedStatement。close();}if(resultSet!null){resultSet。close();}if(connection!null){connection。close();}}} 这样子,当获取到Connection后,将其设置到ThreadLocal中,如果lock方法出现异常,则将其从ThreadLocal中移除掉。 有了这几步后,我们可以来实现解锁操作了。我们在DistributedLock添加一个unlock方法。publicvoidunlock()throwsException{Connectionconnectionnull;try{connectionthreadLocalConn。get();if(!connection。isClosed()){connection。commit();connection。close();threadLocalConn。remove();}}catch(Exceptione){if(connection!null){connection。rollback();connection。close();}threadLocalConn。remove();throwe;}}缺点 毕竟是利用DB来实现分布式锁,对DB还是造成一定的压力。当时考虑使用DB做分布式的一个重要原因是,我们的应用是后端应用,平时流量不大的,反而关键的是要保证库存数据的正确性。对于像前端库存系统,比如添加购物车占用库存等操作,最好别使用DB来实现分布式锁了。进一步思考 如果想锁住多份数据该怎么实现?比如说,某个库存操作,既要修改物理库存,又要修改虚拟库存,想锁住物理库存的同时,又锁住虚拟库存。其实也不是很难,参考lock方法,写一个multiLock方法,提供多个transactionId的入参,for循环处理就可以了。这个后续有时间再补上。