分布式学习之zookeeper3
分布式学习之zookeeper3
分布式很多地方都会用到zk,虽然这个技术出了很久,但是作为应用开发工程师可能这方面接触的还是比较少。
我打算从浅入深的学习下zk的使用。Java操作zk的几种方式
java操作zk可以使用3种方式使用原生zkClinetcurator
开始写代码前请务必启动启动zookeeper(虽然是废话,但是还要提醒一下)。
另外最好开一个客户端用来观察调用代码后的信息。原生
maven引用dependencygroupIdorg。apache。zookeepergroupIdzookeeperartifactIdversion3。4。8versiondependency
代码:Author:jimmyDate:20221221:32Description:publicclassZookeeperOri{publicstaticvoidmain(String〔〕args)throwsIOException,KeeperException,InterruptedException{连接zk服务器ZooKeeperzknewZooKeeper(127。0。0。1,2000,newWatcher(){Overridepublicvoidprocess(WatchedEventwatchedEvent){System。out。println(zookeeper连接);}});节点基本操作获取子节点ListStringchildrenzk。getChildren(,null);children。forEach(eSystem。out。println(e));判断是否存在节点、删除节点注意删除节点的时候必须不存在子节点if(null!zk。exists(my1,null)){zk。delete(my1,1);}原生客户端创建节点CreateModePERSISTENT持久PERSISTENTSEQUENTIAL有序持久EPHEMERAL临时EPHEMERALSEQUENTIAL有序临时这里需要设置权限不然会报错【KeeperErrorCodeMarshallingError】Stringcrszk。create(my1,testmy。getBytes(),ZooDefs。Ids。OPENACLUNSAFE,CreateMode。PERSISTENT);System。out。println(crs);byte〔〕rszk。getData(my1,null,newStat());System。out。println(newString(rs));非顺序节点不能重复创建try{Stringcrstmpzk。create(my1,testmy。getBytes(),ZooDefs。Ids。OPENACLUNSAFE,CreateMode。PERSISTENT);}catch(Exceptione){System。out。println(ex:e。getMessage());}设置值Statstatzk。setData(my1,mytest。getBytes(),1);System。out。println(JSONObject。toJSONString(stat));获取值rszk。getData(my1,null,newStat());System。out。println(newString(rs));创建有序节点UUIDuuidnewUUID(12,1);crszk。create(mysequen,(testmysequenuuid。toString())。getBytes(),ZooDefs。Ids。OPENACLUNSAFE,CreateMode。PERSISTENTSEQUENTIAL);System。out。println(crs);ListStringchildrenszk。getChildren(my,null);childrens。forEach(eSystem。out。println(e));有序节点的获取,不能直接使用节点名称,这里会报错try{rszk。getData(mysequen,null,newStat());System。out。println(newString(rs));}catch(Exceptione){System。out。println(ex:e。getMessage());}System。out。println(有序节点的获取);for(StringnodeKey:childrens){byte〔〕tmpzk。getData(mynodeKey,null,newStat());System。out。println(nodeKey:nodeKey,value:newString(tmp));}监听StringepNodezk。create(myEp,testmyEp。getBytes(),ZooDefs。Ids。OPENACLUNSAFE,CreateMode。EPHEMERAL);zk。getData(myEp,newWatcherDefine(zk),newStat());触发watchzk。setData(myEp,uu。getBytes(),1);zk。close();}staticclassWatcherDefineimplementsWatcher{privateZooKeeperzookeeper;publicWatcherDefine(ZooKeeperzookeeper){this。zookeeperzookeeper;}Overridepublicvoidprocess(WatchedEventwatchedEvent){StatstatnewStat();如果当前的连接状态是连接成功的,那么通过计数器去控制if(watchedEvent。getState()Watcher。Event。KeeperState。SyncConnected){if(Watcher。Event。EventType。NonewatchedEvent。getType()nullwatchedEvent。getPath()){System。out。println(watchedEvent。getState()watchedEvent。getType());}elseif(watchedEvent。getType()Watcher。Event。EventType。NodeDataChanged){try{System。out。println(数据变更触发路径:watchedEvent。getPath()改变后的值:newString(zookeeper。getData(watchedEvent。getPath(),true,stat)));}catch(KeeperExceptione){e。printStackTrace();}catch(InterruptedExceptione){e。printStackTrace();}}elseif(watchedEvent。getType()Watcher。Event。EventType。NodeChildrenChanged){子节点的数据变化会触发try{System。out。println(子节点数据变更路径:watchedEvent。getPath()节点的值:zookeeper。getData(watchedEvent。getPath(),true,stat));}catch(KeeperExceptione){e。printStackTrace();}catch(InterruptedExceptione){e。printStackTrace();}}elseif(watchedEvent。getType()Watcher。Event。EventType。NodeCreated){创建子节点的时候会触发try{System。out。println(节点创建路径:watchedEvent。getPath()节点的值:zookeeper。getData(watchedEvent。getPath(),true,stat));}catch(KeeperExceptione){e。printStackTrace();}catch(InterruptedExceptione){e。printStackTrace();}}elseif(watchedEvent。getType()Watcher。Event。EventType。NodeDeleted){子节点删除会触发System。out。println(节点删除路径:watchedEvent。getPath());}}}}}zkClinet
maven依赖!zkclient依赖dependencygroupIdcom。101tecgroupIdzkclientartifactIdversion0。10versiondependency
我在下面测试的时候遇到了乱码问题分为2部分(我是windows测试环境)无论是否是中文写入的时候都是乱码实现ZkSerializer来解决。上面问题解决后中文仍然是乱码windows的cmdpowershell的锅,需要设置为UTF8临时设置:CHCP65001这个仍然会存在乱码的问题,因此我在练习类种后面把数据打出来了。
练习类:Author:jimmyDate:20221212:00Description:publicclassZkClientPp{publicstaticvoidmain(String〔〕args)throwsInterruptedException,UnsupportedEncodingException{ZkClientzkClientnewZkClient(127。0。0。1:2181);解决乱码问题zkClient。setZkSerializer(newZkCodeSerializer());if(zkClient。exists(prc)){zkClient。delete(prc);}创建持久化节点zkClient。createPersistent(prc);zkClient。writeData(prc,handsomeboy);读取值StringdatazkClient。readData(prc);System。out。println(data);获取子节点ListStringchildrenzkClient。getChildren();children。forEach(cSystem。out。println(c));System。out。println();监听zkClient。subscribeDataChanges(prc,newIZkDataListener(){OverridepublicvoidhandleDataChange(Strings,Objecto)throwsException{System。out。println();System。out。println(节点名称:s节点修改后的值o);}OverridepublicvoidhandleDataDeleted(Strings)throwsException{System。out。println();System。out。println(节点名称:s删除了);}});Thread。sleep(2000);newString(老子叫工藤新一。getBytes(utf8)zkClient。writeData(prc,老子叫工藤新一!);StringszkClient。readData(prc);System。out。println(第一次变更prc);System。out。println(s);Thread。sleep(5000);zkClient。writeData(prc,变成了一个小屁孩);datazkClient。readData(prc);System。out。println(第二次变更prc);System。out。println(data);修改完后,若程序立即结束,则无法看到watch的信息。Thread。sleep(Integer。MAXVALUE);zkClient。close();}乱码解决类staticclassZkCodeSerializerimplementsZkSerializer{Overridepublicbyte〔〕serialize(Objectobject)throwsZkMarshallingError{return((String)object)。getBytes(Charset。forName(UTF8));}OverridepublicObjectdeserialize(byte〔〕bytes)throwsZkMarshallingError{returnnewString(bytes,Charset。forName(UTF8));}}}curator
maven依赖dependencygroupIdorg。apache。curatorgroupIdcuratorframeworkartifactIdversion2。11。0versiondependency
代码:Author:jimmyDate:20221315:25Description:publicclassCuratorClinet{publicstaticvoidmain(String〔〕args)throwsInterruptedException{CuratorFrameworkcuratorFrameworkCuratorFrameworkFactory。newClient(127。0。0。1:2181,5000,2000,newExponentialBackoffRetry(1000,3));curatorFramework。start();创建节点try{StringresultcuratorFramework。create()。creatingParentsIfNeeded()。withMode(CreateMode。PERSISTENT)。forPath(curatorC1C101,curator。getBytes());System。out。println(result);}catch(Exceptione){e。printStackTrace();}删除节点try{默认情况下,version为1递归删除curatorFramework。delete()。deletingChildrenIfNeeded()。forPath(curator);}catch(Exceptione){e。printStackTrace();}查询StatstatnewStat();try{byte〔〕bytescuratorFramework。getData()。storingStatIn(stat)。forPath(my);System。out。println(newString(bytes));}catch(Exceptione){e。printStackTrace();}更新try{StatscuratorFramework。setData()。forPath(my,123。getBytes());System。out。println(s);}catch(Exceptione){e。printStackTrace();}异步操作ExecutorServiceserviceExecutors。newFixedThreadPool(1);CountDownLatchcountDownLatchnewCountDownLatch(1);try{curatorFramework。create()。creatingParentsIfNeeded()。withMode(CreateMode。EPHEMERAL)。inBackground(newBackgroundCallback(){OverridepublicvoidprocessResult(CuratorFrameworkcuratorFramework,CuratorEventcuratorEvent)throwsException{System。out。println(Thread。currentThread()。getName()resultCode:curatorEvent。getResultCode()curatorEvent。getType());countDownLatch。countDown();}},service)。forPath(prc,behappy。getBytes());}catch(Exceptione){e。printStackTrace();}countDownLatch。await();service。shutdown();事务操作(curator独有的)try{CollectionCuratorTransactionResultresultCollectionscuratorFramework。inTransaction()。create()。forPath(demo1,111。getBytes())。and()。setData()。forPath(abc,trunsaction:test。getBytes())。and()。commit();for(CuratorTransactionResultresult:resultCollections){System。out。println(result。getForPath()result。getType());}}catch(Exceptione){e。printStackTrace();}}}