1、代码逻辑实现packageday03;importorg。apache。flink。api。common。state。ValueSimportorg。apache。flink。api。common。state。ValueStateDimportorg。apache。flink。api。common。typeinfo。Timportorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。configuration。Cimportorg。apache。flink。streaming。api。datastream。DataStreamSimportorg。apache。flink。streaming。api。datastream。SingleOutputStreamOimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。streaming。api。functions。KeyedProcessFimportorg。apache。flink。streaming。api。functions。source。SourceFimportorg。apache。flink。util。Cimportjava。util。Rprogram:Flinklearndescription:状态变量author:Mr。逗create:2021091715:21publicclassStateDemo{publicstaticvoidmain(String〔〕args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。setParallelism(1);DataStreamSourceIntegersourceenv。addSource(newSourceFunctionInteger(){privatebooleanisRprivateRandomrandomnewRandom();Overridepublicvoidrun(SourceContextIntegerctx)throwsException{while(isRunning){ctx。collect(random。nextInt(10));Thread。sleep(1000);}}Overridepublicvoidcancel(){isR}});SingleOutputStreamOperatorDoubleprocesssource。keyBy(vtrue)。process(newKeyedProcessFunctionBoolean,Integer,Double(){声明一个状态变量作为累加器状态变量的可见范围是当前key状态变量是单例,只能被实例化一次privateValueStateTuple2Integer,IntegervalueS保存定时器的时间戳privateValueStateLongtimeTs;Overridepublicvoidopen(Configurationparameters)throwsException{super。open(parameters);实例化状态变量valueStategetRuntimeContext()。getState(newValueStateDescriptorTuple2Integer,Integer(sumcount,Types。TUPLE(Types。INT,Types。INT)));timeTsgetRuntimeContext()。getState(newValueStateDescriptorLong(timer,Types。LONG));}OverridepublicvoidprocessElement(Integervalue,Contextctx,CollectorDoubleout)throwsException{当第一条数据到来时,状态变量的值为null使用。value()方法读取状态变量的值,使用。update()方法更新状态变量的值if(valueState。value()null){valueState。update(Tuple2。of(value,1));}else{Tuple2Integer,IntegertmpvalueState。value();valueState。update(Tuple2。of(tmp。f0value,tmp。f11));}if(timeTs。value()null){longtenSecLaterctx。timerService()。currentProcessingTime()101000;ctx。timerService()。registerEventTimeTimer(tenSecLater);timeTs。update(tenSecLater);}}OverridepublicvoidonTimer(longtimestamp,OnTimerContextctx,CollectorDoubleout)throwsException{super。onTimer(timestamp,ctx,out);if(valueState。value()!null){out。collect((double)valueState。value()。f0valueState。value()。f1);timeTs。clear();}}});process。print();StringnameStateDemo。class。getName();env。execute(name);}}2、结果之展示C:ProgramFilesJavajdk1。8。0191binjava。exejavaagent:F:appIntelliJIDEA2019。3。3libideart。jar55777:F:appIntelliJIDEA2019。3。3binDfile。encodingUTF8classpathC:ProgramFilesJavajdk1。8。0191jrelibcharsets。C:ProgramFilesJavajdk1。8。0191jrelibdeploy。C:ProgramFilesJavajdk1。8。0191jrelibextaccessbridge64。C:ProgramFilesJavajdk1。8。0191jrelibextcldrdata。C:ProgramFilesJavajdk1。8。0191jrelibextdnsns。C:ProgramFilesJavajdk1。8。0191jrelibextjaccess。C:ProgramFilesJavajdk1。8。0191jrelibextjfxrt。C:ProgramFilesJavajdk1。8。0191jrelibextlocaledata。C:ProgramFilesJavajdk1。8。0191jrelibextashorn。C:ProgramFilesJavajdk1。8。0191jrelibextsunec。C:ProgramFilesJavajdk1。8。0191jrelibextsunjceprovider。C:ProgramFilesJavajdk1。8。0191jrelibextsunmscapi。C:ProgramFilesJavajdk1。8。0191jrelibextsunpkcs11。C:ProgramFilesJavajdk1。8。0191jrelibextzipfs。C:ProgramFilesJavajdk1。8。0191jrelibjavaws。C:ProgramFilesJavajdk1。8。0191jrelibjce。C:ProgramFilesJavajdk1。8。0191jrelibjfr。C:ProgramFilesJavajdk1。8。0191jrelibjfxswt。C:ProgramFilesJavajdk1。8。0191jrelibjsse。C:ProgramFilesJavajdk1。8。0191jrelibmanagementagent。C:ProgramFilesJavajdk1。8。0191jrelibplugin。C:ProgramFilesJavajdk1。8。0191jrelibresources。C:ProgramFilesJavajdk1。8。0191jrelibrt。D:bigDatabigDatalearnFC:UsersAdministrator。m2repositoryorgapacheflinkflinkjava1。13。0flinkjava1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkcore1。13。0flinkcore1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkannotations1。13。0flinkannotations1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkmetricscore1。13。0flinkmetricscore1。13。0。C:UsersAdministrator。m2repositorycomesotericsoftwarekryokryo2。24。0kryo2。24。0。C:UsersAdministrator。m2repositorycomesotericsoftwareminlogminlog1。2minlog1。2。C:UsersAdministrator。m2repositoryorgobjenesisobjenesis2。1objenesis2。1。C:UsersAdministrator。m2repositorycommonscollectionscommonscollections3。2。2commonscollections3。2。2。C:UsersAdministrator。m2repositoryorgapachecommonscommonscompress1。20commonscompress1。20。C:UsersAdministrator。m2repositoryorgapachecommonscommonslang33。3。2commonslang33。3。2。C:UsersAdministrator。m2repositoryorgapachecommonscommonsmath33。5commonsmath33。5。C:UsersAdministrator。m2repositoryorgslf4jslf4japi1。7。15slf4japi1。7。15。C:UsersAdministrator。m2repositorycomgooglecodefindbugsjsr3051。3。9jsr3051。3。9。C:UsersAdministrator。m2repositoryorgapacheflinkforceshading1。13。0forceshading1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkstreamingjava2。121。13。0flinkstreamingjava2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkfilesinkcommon1。13。0flinkfilesinkcommon1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkruntime2。121。13。0flinkruntime2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkqueryablestateclientjava1。13。0flinkqueryablestateclientjava1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkhadoopfs1。13。0flinkhadoopfs1。13。0。C:UsersAdministrator。m2repositorycommonsiocommonsio2。7commonsio2。7。C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadednetty4。1。49。Final13。0flinkshadednetty4。1。49。Final13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedjackson2。12。113。0flinkshadedjackson2。12。113。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedzookeeper33。4。1413。0flinkshadedzookeeper33。4。1413。0。C:UsersAdministrator。m2repositoryorgjavassistjavassist3。24。0GAjavassist3。24。0GA。C:UsersAdministrator。m2repositorycomypesafeakkaakkaactor2。122。5。21akkaactor2。122。5。21。C:UsersAdministrator。m2repositorycomypesafeconfig1。3。3config1。3。3。C:UsersAdministrator。m2repositoryorgscalalangmodulesscalajava8compat2。12。8。0scalajava8compat2。120。8。0。C:UsersAdministrator。m2repositorycomypesafeakkaakkastream2。122。5。21akkastream2。122。5。21。C:UsersAdministrator。m2repositoryorgreactivestreamsreactivestreams1。0。2reactivestreams1。0。2。C:UsersAdministrator。m2repositorycomypesafesslconfigcore2。12。3。7sslconfigcore2。120。3。7。C:UsersAdministrator。m2repositoryorgscalalangmodulesscalaparsercombinators2。121。1。1scalaparsercombinators2。121。1。1。C:UsersAdministrator。m2repositorycomypesafeakkaakkaprotobuf2。122。5。21akkaprotobuf2。122。5。21。C:UsersAdministrator。m2repositorycomypesafeakkaakkaslf4j2。122。5。21akkaslf4j2。122。5。21。C:UsersAdministrator。m2repositoryorgclappergrizzledslf4j2。121。3。2grizzledslf4j2。121。3。2。C:UsersAdministrator。m2repositorycomgithubscoptscopt2。123。5。0scopt2。123。5。0。C:UsersAdministrator。m2repositoryorgxerialsnappysnappyjava1。1。8。3snappyjava1。1。8。3。C:UsersAdministrator。m2repositorycomwitterchill2。12。7。6chill2。120。7。6。C:UsersAdministrator。m2repositorycomwitterchilljava。7。6chilljava0。7。6。C:UsersAdministrator。m2repositoryorglz4lz4java1。6。0lz4java1。6。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedguava18。013。0flinkshadedguava18。013。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkclients2。121。13。0flinkclients2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkoptimizer2。121。13。0flinkoptimizer2。121。13。0。C:UsersAdministrator。m2repositorycommonsclicommonscli1。3。1commonscli1。3。1。C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapijavabridge2。121。13。0flinktableapijavabridge2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapijava1。13。0flinktableapijava1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinktableplannerblink2。121。13。0flinktableplannerblink2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapiscala2。121。13。0flinktableapiscala2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapiscalabridge2。121。13。0flinktableapiscalabridge2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinktableruntimeblink2。121。13。0flinktableruntimeblink2。121。13。0。C:UsersAdministrator。m2repositoryorgcodehausjaninojanino3。0。11janino3。0。11。C:UsersAdministrator。m2repositoryorgcodehausjaninocommonscompiler3。0。11commonscompiler3。0。11。C:UsersAdministrator。m2repositoryorgapachecalciteavaticaavaticacore1。17。0avaticacore1。17。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkstreamingscala2。121。13。0flinkstreamingscala2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkscala2。121。13。0flinkscala2。121。13。0。C:UsersAdministrator。m2repositoryorgscalalangscalareflect2。12。7scalareflect2。12。7。C:UsersAdministrator。m2repositoryorgscalalangscalalibrary2。12。7scalalibrary2。12。7。C:UsersAdministrator。m2repositoryorgscalalangscalacompiler2。12。7scalacompiler2。12。7。C:UsersAdministrator。m2repositoryorgscalalangmodulesscalaxml2。121。0。6scalaxml2。121。0。6。C:UsersAdministrator。m2repositoryorgapacheflinkflinktablecommon1。13。0flinktablecommon1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorfiles1。13。0flinkconnectorfiles1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedasm77。113。0flinkshadedasm77。113。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkcep2。121。13。0flinkcep2。121。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkcsv1。13。0flinkcsv1。13。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorkafka2。121。13。0flinkconnectorkafka2。121。13。0。C:UsersAdministrator。m2repositoryorgapachekafkakafkaclients2。4。1kafkaclients2。4。1。C:UsersAdministrator。m2repositorycomgithublubenzstdjni1。4。31zstdjni1。4。31。C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorbase1。13。0flinkconnectorbase1。13。0。C:UsersAdministrator。m2repositoryorgapachebahirflinkconnectorredis2。111。0flinkconnectorredis2。111。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkstreamingjava2。111。2。0flinkstreamingjava2。111。2。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkruntime2。111。2。0flinkruntime2。111。2。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedhadoop21。2。0flinkshadedhadoop21。2。0。C:UsersAdministrator。m2repositoryorgukaanixz1。0xz1。0。C:UsersAdministrator。m2repositoryxmlencxmlenc。52xmlenc0。52。C:UsersAdministrator。m2repositorycommonscodeccommonscodec1。4commonscodec1。4。C:UsersAdministrator。m2repositorycommonsnetcommonsnet3。1commonsnet3。1。C:UsersAdministrator。m2repositoryjavaxservletservletapi2。5servletapi2。5。C:UsersAdministrator。m2repositoryorgmortbayjettyjettyutil6。1。26jettyutil6。1。26。C:UsersAdministrator。m2repositorycomsunjerseyjerseycore1。9jerseycore1。9。C:UsersAdministrator。m2repositorycommonselcommonsel1。0commonsel1。0。C:UsersAdministrator。m2repositorycommonsloggingcommonslogging1。1。3commonslogging1。1。3。C:UsersAdministrator。m2repositorycomjamesmurtyutilsjavaxmlbuilder。4javaxmlbuilder0。4。C:UsersAdministrator。m2repositorycommonslangcommonslang2。6commonslang2。6。C:UsersAdministrator。m2repositorycommonsconfigurationcommonsconfiguration1。7commonsconfiguration1。7。C:UsersAdministrator。m2repositorycommonsdigestercommonsdigester1。8。1commonsdigester1。8。1。C:UsersAdministrator。m2repositoryorgcodehausjacksonjacksoncoreasl1。8。8jacksoncoreasl1。8。8。C:UsersAdministrator。m2repositoryorgcodehausjacksonjacksonmapperasl1。8。8jacksonmapperasl1。8。8。C:UsersAdministrator。m2repositoryorgapacheavroavro1。7。7avro1。7。7。C:UsersAdministrator。m2repositorycomhoughtworksparanamerparanamer2。3paranamer2。3。C:UsersAdministrator。m2repositorycomjcraftjsch。1。42jsch0。1。42。C:UsersAdministrator。m2repositorycommonsbeanutilscommonsbeanutilsbeancollections1。8。3commonsbeanutilsbeancollections1。8。3。C:UsersAdministrator。m2repositorycommonsdaemoncommonsdaemon1。0。13commonsdaemon1。0。13。C:UsersAdministrator。m2repositoryjavaxxmlbindjaxbapi2。2。2jaxbapi2。2。2。C:UsersAdministrator。m2repositoryjavaxxmlstreamstaxapi1。02staxapi1。02。C:UsersAdministrator。m2repositoryjavaxactivationactivation1。1activation1。1。C:UsersAdministrator。m2repositoryioettyettyall4。0。27。Finalettyall4。0。27。Final。C:UsersAdministrator。m2repositorycomdataartisansflakkaactor2。112。3customflakkaactor2。112。3custom。C:UsersAdministrator。m2repositorycomdataartisansflakkaremote2。112。3customflakkaremote2。112。3custom。C:UsersAdministrator。m2repositoryioettyetty3。8。0。Finaletty3。8。0。Final。C:UsersAdministrator。m2repositoryorguncommonsmathsuncommonsmaths1。2。2auncommonsmaths1。2。2a。C:UsersAdministrator。m2repositorycomdataartisansflakkaslf4j2。112。3customflakkaslf4j2。112。3custom。C:UsersAdministrator。m2repositoryorgclappergrizzledslf4j2。111。0。2grizzledslf4j2。111。0。2。C:UsersAdministrator。m2repositorycomgithubscoptscopt2。113。2。0scopt2。113。2。0。C:UsersAdministrator。m2repositorycomfasterxmljacksoncorejacksoncore2。7。4jacksoncore2。7。4。C:UsersAdministrator。m2repositorycomfasterxmljacksoncorejacksondatabind2。7。4jacksondatabind2。7。4。C:UsersAdministrator。m2repositorycomfasterxmljacksoncorejacksonannotations2。7。0jacksonannotations2。7。0。C:UsersAdministrator。m2repositoryorgapachezookeeperzookeeper3。4。6zookeeper3。4。6。C:UsersAdministrator。m2repositoryjlinejline。9。94jline0。9。94。C:UsersAdministrator。m2repositoryjunitjunit3。8。1junit3。8。1。C:UsersAdministrator。m2repositorycomwitterchill2。11。7。4chill2。110。7。4。C:UsersAdministrator。m2repositoryorgapacheflinkflinkclients2。111。2。0flinkclients2。111。2。0。C:UsersAdministrator。m2repositoryorgapacheflinkflinkoptimizer2。111。2。0flinkoptimizer2。111。2。0。C:UsersAdministrator。m2repositoryorgapacheslingorg。apache。sling。commons。json2。0。6org。apache。sling。commons。json2。0。6。C:UsersAdministrator。m2repositorymysqlmysqlconnectorjava8。0。21mysqlconnectorjava8。0。21。C:UsersAdministrator。m2repositorycomgoogleprotobufprotobufjava3。11。4protobufjava3。11。4。C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorjdbc2。121。13。0flinkconnectorjdbc2。121。13。0。C:UsersAdministrator。m2repositoryorgslf4jslf4jlog4j121。7。30slf4jlog4j121。7。30。C:UsersAdministrator。m2repositorylog4jlog4j1。2。17log4j1。2。17。C:UsersAdministrator。m2repositoryorgapachelogginglog4jlog4jtoslf4j2。14。0log4jtoslf4j2。14。0。C:UsersAdministrator。m2repositoryorgapachelogginglog4jlog4japi2。14。0log4japi2。14。0。C:UsersAdministrator。m2repositoryredisclientsjedis2。9。0jedis2。9。0。C:UsersAdministrator。m2repositoryorgapachecommonscommonspool22。4。2commonspool22。4。2。C:UsersAdministrator。m2repositorycomgooglecodegsongson2。8。5gson2。8。5。jarday03。StateDemolog4j:WARNNoappenderscouldbefoundforlogger(org。apache。flink。api。java。ClosureCleaner)。log4j:WARNPleaseinitializethelog4jsystemproperly。log4j:WARNSeehttp:logging。apache。orglog4j1。2faq。htmlnoconfigformoreinfo。