版本 日期 备注 1。0 2019。4。27 文章首发 1。1 2021。6。10 修改标题:从一段代码谈起浅谈JavaIO接口谈谈代码:JavaIO业务代码优化之路1。前言 前阵子休息天日常在寻找项目里不好的代码,看到了这样的一段代码:privateResultsshSameExec(Sessionsession,Stringcmd){if(log。isDebugEnabled()){log。debug(shellcommand:{},cmd);}UserInfouigetUserInfo();session。setUserInfo(ui);intexitStatus0;StringBuilderbuildernewStringBuilder();ChannelExecchannel;InputStreamin;InputStreamerr;try{session。connect(connectTimeout);channel(ChannelExec)session。openChannel(exec);channel。setCommand(cmd);inchannel。getInputStream();errchannel。getErrStream();channel。connect();}catch(Exceptione){thrownewCloudRuntimeException(e);}try{longlastReadLong。MAXVALUE;byte〔〕tmpnewbyte〔1024〕;while(true){while(in。available()0err。available()0){inti0;if(in。available()0){iin。read(tmp,0,1024);}elseif(err。available()0){ierr。read(tmp,0,1024);}if(i0){break;}lastReadSystem。currentTimeMillis();builder。append(newString(tmp,0,i));}if(channel。isClosed()){if(in。available()0){continue;}exitStatuschannel。getExitStatus();break;}if(System。currentTimeMillis()lastReadexeTimeout){break;}}}catch(IOExceptione){thrownewCloudRuntimeException(e);}finally{channel。disconnect();session。disconnect();}if(0!exitStatus){returnResult。createByError(ErrorData。builder()。errorCode(ResultCode。EXECUTESSHFAIL。getCode())。detail(builder。toString())。title(ResultCode。EXECUTESSHFAIL。toString())。build());}else{returnResult。createBySuccess(builder。toString());}} 简单解释一下这段代码即通过ssh到一台机器上,然后执行一些命令。对命令输出的东西,开了一个循环,每一次读一定的位置,然后以字节流的形式读回来。 这段代码有点丑,于是我闻到了学习的味道。 首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream。其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来。 在改良之前,我们先来回顾一下JavaIO的接口定义。2。JavaIO接口知识回顾2。1低级抽象接口:InputStream和OutputStream 这里有同学可能问了,为啥叫它低抽象接口呢?因为它离底层太近了,计算机本来就是处理二进制的,而这两个接口正是用来处理二进制数据流的。 先简单看一眼这两个接口:InputStreamThisabstractclassisthesuperclassofallclassesrepresentinganinputstreamofbytes。pApplicationsthatneedtodefineasubclassofcodeInputStreammustalwaysprovideamethodthatreturnsthenextbyteofinput。authorArthurvanHoffseejava。io。BufferedInputStreamseejava。io。ByteArrayInputStreamseejava。io。DataInputStreamseejava。io。FilterInputStreamseejava。io。InputStreamread()seejava。io。OutputStreamseejava。io。PushbackInputStreamsinceJDK1。0publicabstractclassInputStreamimplementsCloseable{。。。。。}codeOutputStreamThisabstractclassisthesuperclassofallclassesrepresentinganoutputstreamofbytes。Anoutputstreamacceptsoutputbytesandsendsthemtosomesink。pApplicationsthatneedtodefineasubclassofcodeOutputStreammustalwaysprovideatleastamethodthatwritesonebyteofoutput。authorArthurvanHoffseejava。io。BufferedOutputStreamseejava。io。ByteArrayOutputStreamseejava。io。DataOutputStreamseejava。io。FilterOutputStreamseejava。io。InputStreamseejava。io。OutputStreamwrite(int)sinceJDK1。0publicabstractclassOutputStreamimplementsCloseable,Flushable{。。。}code 我们可以发现,它们都实现了Closeable的接口。因此大家在使用这些原生类时,要注意在结束时调用Close方法哦。 这两个接口的常用实现类有:FileInputStream和FileOutputStreamDataInputStream和DataOutputStreamObjectInputStream和ObjectOutputStream2。2高级抽象接口Writer和Reader 为啥说它是高级抽象接口呢?我们先来看看它们的注释:WriterAbstractclassforwritingtocharacterstreams。Theonlymethodsthatasubclassmustimplementarewrite(char〔〕,int,int),flush(),andclose()。Mostsubclasses,however,willoverridesomeofthemethodsdefinedhereinordertoprovidehigherefficiency,additionalfunctionality,orboth。seeWriterseeBufferedWriterseeCharArrayWriterseeFilterWriterseeOutputStreamWriterseeFileWriterseePipedWriterseePrintWriterseeStringWriterseeReaderauthorMarkReinholdsinceJDK1。1publicabstractclassWriterimplementsAppendable,Closeable,Flushable{ReaderAbstractclassforreadingcharacterstreams。Theonlymethodsthatasubclassmustimplementareread(char〔〕,int,int)andclose()。Mostsubclasses,however,willoverridesomeofthemethodsdefinedhereinordertoprovidehigherefficiency,additionalfunctionality,orboth。seeBufferedReaderseeLineNumberReaderseeCharArrayReaderseeInputStreamReaderseeFileReaderseeFilterReaderseePushbackReaderseePipedReaderseeStringReaderseeWriterauthorMarkReinholdsinceJDK1。1publicabstractclassReaderimplementsReadable,Closeable{ 我们可以看到,这个抽象类是用来面向character的,也就是字符。字符的抽象等级必然比字节高,因为字符靠近上层,即人类。2。3优化输入和输出Buffered 如果我们直接使用上述实现类去打开一个文件(如FileWriter、FileReader、FileInputStream、FileOutputStream),对其对象调用read、write、readLine等,每个请求都是由基础OS直接处理的,这会使一个程序效率低得多因为它们都会引发磁盘访问or网络请求等。 为了减少这种开销,Java平台实现缓冲IO流。缓冲输入流从被称为缓冲区(buffer)的存储器区域读出数据;仅当缓冲区是空时,本地输入API才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出API。 用于包装非缓存流的缓冲流类有4个:BufferedInputStream和BufferedOutputStream用于创建字节缓冲字节流,BufferedReader和BufferedWriter用于创建字符缓冲字节流。3。着手优化 之前,我们提到了这段代码写得搓的地方:首先是对两个Stream的消费,很显然,在多核环境下,我们同时也只能够消费其中一个Stream。其次,这代码太挫了,自己定义一个tmp,然后1024、1024这样的去取出来。 故此,我们可以考虑对每个Stream都进行包装,支持用线程去消费,其次我们可以用高级抽象分接口去适配Byte,然后去装饰成Buffer。 接下来,我们来看一段ZStack里的工具类ShellUtils,为了节省篇幅,我们仅仅截出它在IDE里的Structure: run方法的核心: 我们可以看到StreamConsumer这个类,我们来看一下它的代码:privatestaticclassStreamConsumerextendsThread{finalInputStreamin;finalPrintWriterout;finalbooleanflush;StreamConsumer(InputStreamin,PrintWriterout,booleanflushEveryWrite){this。inin;this。outout;flushflushEveryWrite;}Overridepublicvoidrun(){BufferedReaderbrnull;try{brnewBufferedReader(newInputStreamReader(in));Stringline;while((linebr。readLine())!null){out。println(line);if(flush){out。flush();}}}catch(Exceptione){logger。warn(e。getMessage(),e);}finally{try{if(br!null){br。close();}}catch(IOExceptione){logger。warn(e。getMessage(),e);}}}} 这段代码已经达到了我们的理想状态:线程消费,高级抽象。3。1使用Kotlin3。1。1KotlinIO 闲话不多说,先贴代码为敬:importjava。io。InputStreamimportjava。io。InputStreamReaderclassStreamGobbler(privatevalinputStream:InputStream,privatevarresult:StringBuilder):Runnable{overridefunrun(){InputStreamReader(inputStream)。buffered()。use{it。lines()。forEach{rresult。append(r)}}}} 还是一样熟悉的配方,我们逐行来解读:定义一个类,并且要求构造函数必须传入InputStream和一个StringBuilder。且实现了Runnable接口,这意味着它可以被线程消费。覆写run方法。我们可以看到InputStream被适配成了InputStreamReader,这意味着它可以输出字符流了,然后我们使用了Kotlin的接口将其装饰成了Buffer。读每一行buffer,并appned到result这个StringBuilder里去。读完就可以告辞了,close。(use会将其关闭)3。1。2KotlinCoroutine 先看一下上面的图,我们都知道内核态线程是由OS调度的,但当一个线程拿到时间片时,却调到了阻塞IO,那么只能等在那边,浪费时间。 而协程则可以解决这个问题,当一个Jobhang住的时候,可以去做别的事情,绕开阻塞。更好的利用时间片。 最后,我们来看一下成品代码:overridefunsshExecWithCoroutine(session:Session,cmd:String):SimpleResultoutString{valuiInnerUserInfo()session。userInfouivalexitStatus:IntvarchannelChannelExec()valinputBuilderStringBuilder()valerrorBuilderStringBuilder()try{session。connect(connectTimeout)channelsession。openChannel(exec)asChannelExecchannel。setCommand(cmd)channel。connect()valinputStreamStreamGobbler(channel。inputStream,inputBuilder)valerrStreamStreamGobbler(channel。errStream,errorBuilder)valcustomJobGlobalScope。launch{customStream(inputStream,errStream)}while(!customJob。isCompleted){waitjobbedone}exitStatuschannel。exitStatus}catch(e:IOException){throwjava。lang。RuntimeException(e)}finally{if(channel。isConnected){channel。disconnect()}if(session。isConnected){session。disconnect()}}returnif(0!exitStatus){returnSimpleResult。createByError(ErrorData。Builder()。errorCode(ResultCode。EXECUTESSHFAIL。value)。detail(errorBuilder。toString())。title(ResultCode。EXECUTESSHFAIL。toString())。build())}else{SimpleResult。createBySuccess(inputBuilder。toString())}}privatesuspendfuncustomStream(inputStream:StreamGobbler,errorStream:StreamGobbler){valinputDeferredGlobalScope。async{inputStream。run()}valerrorDeferredGlobalScope。async{errorStream。run()}inputDeferred。join()errorDeferred。join()} 作者:泊浮目 链接:https:juejin。cnpost6971215096282513416