1、代码逻辑实现packageday03;importorg。apache。flink。api。common。state。MapState;importorg。apache。flink。api。common。state。MapStateDescriptor;importorg。apache。flink。api。common。typeinfo。Types;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。api。functions。KeyedProcessFunction;importorg。apache。flink。streaming。api。functions。source。SourceFunction;importorg。apache。flink。util。Collector;importjava。sql。Timestamp;importjava。util。Calendar;importjava。util。Random;program:Flinklearndescription:字典状态变量求平均值author:Mr。逗create:2021091717:12publicclassAvgByMapState{SourceFunction并行度只能为1自定义并行化版本的数据源,需要使用ParallelSourceFunctionpublicstaticclassClickSourceimplementsSourceFunctionEvent{privatebooleanrunningtrue;privateString〔〕userArr{Mary,Bob,Alice,Liz};privateString〔〕urlArr{。home,。cart,。fav,。prod?id1,。prod?id2};privateRandomrandomnewRandom();Overridepublicvoidrun(SourceContextEventctx)throwsException{while(running){collect方法,向下游发送数据ctx。collect(newEvent(userArr〔random。nextInt(userArr。length)〕,urlArr〔random。nextInt(urlArr。length)〕,Calendar。getInstance()。getTimeInMillis()));Thread。sleep(1000L);}}Overridepublicvoidcancel(){runningfalse;}}publicstaticclassEvent{publicStringuser;publicStringurl;publicLongtimestamp;publicEvent(){}publicEvent(Stringuser,Stringurl,Longtimestamp){this。useruser;this。urlurl;this。timestamptimestamp;}OverridepublicStringtoString(){returnEvent{useruser,urlurl,timestampnewTimestamp(timestamp)};}}publicstaticvoidmain(String〔〕args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。setParallelism(1);DataStreamSourceEventsourceenv。addSource(newClickSource());source。keyBy(v1)。process(newKeyedProcessFunctionInteger,Event,String(){privateMapStateString,LongmapState;Overridepublicvoidopen(Configurationparameters)throwsException{super。open(parameters);mapStategetRuntimeContext()。getMapState(newMapStateDescriptorString,Long(mapstate,Types。STRING,Types。LONG));}OverridepublicvoidprocessElement(Eventvalue,Contextctx,CollectorStringout)throwsException{if(mapState。contains(value。user)){mapState。put(value。user,mapState。get(value。user)1L);}else{mapState。put(value。user,1L);}求PV平均值LonguserNum0L;LongpvSum0L;for(Stringuser:mapState。keys()){userNum1;pvSummapState。get(user);}doubleavg(double)pvSumuserNum;out。collect(pvSumpvSum,userNumuserNum当前pv的平均值是:avg);}})。print();StringnameAvgByMapState。class。getName();env。execute(name);}}2、结果之展示C:ProgramFilesJavajdk1。8。0191binjava。exejavaagent:F:appIntelliJIDEA2019。3。3libideart。jar57482:F:appIntelliJIDEA2019。3。3binDfile。encodingUTF8classpathC:ProgramFilesJavajdk1。8。0191jrelibcharsets。jar;C:ProgramFilesJavajdk1。8。0191jrelibdeploy。jar;C:ProgramFilesJavajdk1。8。0191jrelibextaccessbridge64。jar;C:ProgramFilesJavajdk1。8。0191jrelibextcldrdata。jar;C:ProgramFilesJavajdk1。8。0191jrelibextdnsns。jar;C:ProgramFilesJavajdk1。8。0191jrelibextjaccess。jar;C:ProgramFilesJavajdk1。8。0191jrelibextjfxrt。jar;C:ProgramFilesJavajdk1。8。0191jrelibextlocaledata。jar;C:ProgramFilesJavajdk1。8。0191jrelibextashorn。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunec。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunjceprovider。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunmscapi。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunpkcs11。jar;C:ProgramFilesJavajdk1。8。0191jrelibextzipfs。jar;C:ProgramFilesJavajdk1。8。0191jrelibjavaws。jar;C:ProgramFilesJavajdk1。8。0191jrelibjce。jar;C:ProgramFilesJavajdk1。8。0191jrelibjfr。jar;C:ProgramFilesJavajdk1。8。0191jrelibjfxswt。jar;C:ProgramFilesJavajdk1。8。0191jrelibjsse。jar;C:ProgramFilesJavajdk1。8。0191jrelibmanagementagent。jar;C:ProgramFilesJavajdk1。8。0191jrelibplugin。jar;C:ProgramFilesJavajdk1。8。0191jrelibresources。jar;C:ProgramFilesJavajdk1。8。0191jrelibrt。jar;D:bigDatabigDatalearnFlinklearnargetclasses;C:UsersAdministrator。m2repositoryorgapacheflinkflinkjava1。13。0flinkjava1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkcore1。13。0flinkcore1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkannotations1。13。0flinkannotations1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkmetricscore1。13。0flinkmetricscore1。13。0。jar;C:UsersAdministrator。m2repositorycomesotericsoftwarekryokryo2。24。0kryo2。24。0。jar;C:UsersAdministrator。m2repositorycomesotericsoftwareminlogminlog1。2minlog1。2。jar;C:UsersAdministrator。m2repositoryorgobjenesisobjenesis2。1objenesis2。1。jar;C:UsersAdministrator。m2repositorycommonscollectionscommonscollections3。2。2commonscollections3。2。2。jar;C:UsersAdministrator。m2repositoryorgapachecommonscommonscompress1。20commonscompress1。20。jar;C:UsersAdministrator。m2repositoryorgapachecommonscommonslang33。3。2commonslang33。3。2。jar;C:UsersAdministrator。m2repositoryorgapachecommonscommonsmath33。5commonsmath33。5。jar;C:UsersAdministrator。m2repositoryorgslf4jslf4japi1。7。15slf4japi1。7。15。jar;C:UsersAdministrator。m2repositorycomgooglecodefindbugsjsr3051。3。9jsr3051。3。9。jar;C:UsersAdministrator。m2repositoryorgapacheflinkforceshading1。13。0forceshading1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkstreamingjava2。121。13。0flinkstreamingjava2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkfilesinkcommon1。13。0flinkfilesinkcommon1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkruntime2。121。13。0flinkruntime2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkqueryablestateclientjava1。13。0flinkqueryablestateclientjava1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkhadoopfs1。13。0flinkhadoopfs1。13。0。jar;C:UsersAdministrator。m2repositorycommonsiocommonsio2。7commonsio2。7。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadednetty4。1。49。Final13。0flinkshadednetty4。1。49。Final13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedjackson2。12。113。0flinkshadedjackson2。12。113。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedzookeeper33。4。1413。0flinkshadedzookeeper33。4。1413。0。jar;C:UsersAdministrator。m2repositoryorgjavassistjavassist3。24。0GAjavassist3。24。0GA。jar;C:UsersAdministrator。m2repositorycomypesafeakkaakkaactor2。122。5。21akkaactor2。122。5。21。jar;C:UsersAdministrator。m2repositorycomypesafeconfig1。3。3config1。3。3。jar;C:UsersAdministrator。m2repositoryorgscalalangmodulesscalajava8compat2。12。8。0scalajava8compat2。120。8。0。jar;C:UsersAdministrator。m2repositorycomypesafeakkaakkastream2。122。5。21akkastream2。122。5。21。jar;C:UsersAdministrator。m2repositoryorgreactivestreamsreactivestreams1。0。2reactivestreams1。0。2。jar;C:UsersAdministrator。m2repositorycomypesafesslconfigcore2。12。3。7sslconfigcore2。120。3。7。jar;C:UsersAdministrator。m2repositoryorgscalalangmodulesscalaparsercombinators2。121。1。1scalaparsercombinators2。121。1。1。jar;C:UsersAdministrator。m2repositorycomypesafeakkaakkaprotobuf2。122。5。21akkaprotobuf2。122。5。21。jar;C:UsersAdministrator。m2repositorycomypesafeakkaakkaslf4j2。122。5。21akkaslf4j2。122。5。21。jar;C:UsersAdministrator。m2repositoryorgclappergrizzledslf4j2。121。3。2grizzledslf4j2。121。3。2。jar;C:UsersAdministrator。m2repositorycomgithubscoptscopt2。123。5。0scopt2。123。5。0。jar;C:UsersAdministrator。m2repositoryorgxerialsnappysnappyjava1。1。8。3snappyjava1。1。8。3。jar;C:UsersAdministrator。m2repositorycomwitterchill2。12。7。6chill2。120。7。6。jar;C:UsersAdministrator。m2repositorycomwitterchilljava。7。6chilljava0。7。6。jar;C:UsersAdministrator。m2repositoryorglz4lz4java1。6。0lz4java1。6。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedguava18。013。0flinkshadedguava18。013。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkclients2。121。13。0flinkclients2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkoptimizer2。121。13。0flinkoptimizer2。121。13。0。jar;C:UsersAdministrator。m2repositorycommonsclicommonscli1。3。1commonscli1。3。1。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapijavabridge2。121。13。0flinktableapijavabridge2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapijava1。13。0flinktableapijava1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktableplannerblink2。121。13。0flinktableplannerblink2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapiscala2。121。13。0flinktableapiscala2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktableapiscalabridge2。121。13。0flinktableapiscalabridge2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktableruntimeblink2。121。13。0flinktableruntimeblink2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgcodehausjaninojanino3。0。11janino3。0。11。jar;C:UsersAdministrator。m2repositoryorgcodehausjaninocommonscompiler3。0。11commonscompiler3。0。11。jar;C:UsersAdministrator。m2repositoryorgapachecalciteavaticaavaticacore1。17。0avaticacore1。17。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkstreamingscala2。121。13。0flinkstreamingscala2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkscala2。121。13。0flinkscala2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgscalalangscalareflect2。12。7scalareflect2。12。7。jar;C:UsersAdministrator。m2repositoryorgscalalangscalalibrary2。12。7scalalibrary2。12。7。jar;C:UsersAdministrator。m2repositoryorgscalalangscalacompiler2。12。7scalacompiler2。12。7。jar;C:UsersAdministrator。m2repositoryorgscalalangmodulesscalaxml2。121。0。6scalaxml2。121。0。6。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinktablecommon1。13。0flinktablecommon1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorfiles1。13。0flinkconnectorfiles1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedasm77。113。0flinkshadedasm77。113。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkcep2。121。13。0flinkcep2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkcsv1。13。0flinkcsv1。13。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorkafka2。121。13。0flinkconnectorkafka2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgapachekafkakafkaclients2。4。1kafkaclients2。4。1。jar;C:UsersAdministrator。m2repositorycomgithublubenzstdjni1。4。31zstdjni1。4。31。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorbase1。13。0flinkconnectorbase1。13。0。jar;C:UsersAdministrator。m2repositoryorgapachebahirflinkconnectorredis2。111。0flinkconnectorredis2。111。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkstreamingjava2。111。2。0flinkstreamingjava2。111。2。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkruntime2。111。2。0flinkruntime2。111。2。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkshadedhadoop21。2。0flinkshadedhadoop21。2。0。jar;C:UsersAdministrator。m2repositoryorgukaanixz1。0xz1。0。jar;C:UsersAdministrator。m2repositoryxmlencxmlenc。52xmlenc0。52。jar;C:UsersAdministrator。m2repositorycommonscodeccommonscodec1。4commonscodec1。4。jar;C:UsersAdministrator。m2repositorycommonsnetcommonsnet3。1commonsnet3。1。jar;C:UsersAdministrator。m2repositoryjavaxservletservletapi2。5servletapi2。5。jar;C:UsersAdministrator。m2repositoryorgmortbayjettyjettyutil6。1。26jettyutil6。1。26。jar;C:UsersAdministrator。m2repositorycomsunjerseyjerseycore1。9jerseycore1。9。jar;C:UsersAdministrator。m2repositorycommonselcommonsel1。0commonsel1。0。jar;C:UsersAdministrator。m2repositorycommonsloggingcommonslogging1。1。3commonslogging1。1。3。jar;C:UsersAdministrator。m2repositorycomjamesmurtyutilsjavaxmlbuilder。4javaxmlbuilder0。4。jar;C:UsersAdministrator。m2repositorycommonslangcommonslang2。6commonslang2。6。jar;C:UsersAdministrator。m2repositorycommonsconfigurationcommonsconfiguration1。7commonsconfiguration1。7。jar;C:UsersAdministrator。m2repositorycommonsdigestercommonsdigester1。8。1commonsdigester1。8。1。jar;C:UsersAdministrator。m2repositoryorgcodehausjacksonjacksoncoreasl1。8。8jacksoncoreasl1。8。8。jar;C:UsersAdministrator。m2repositoryorgcodehausjacksonjacksonmapperasl1。8。8jacksonmapperasl1。8。8。jar;C:UsersAdministrator。m2repositoryorgapacheavroavro1。7。7avro1。7。7。jar;C:UsersAdministrator。m2repositorycomhoughtworksparanamerparanamer2。3paranamer2。3。jar;C:UsersAdministrator。m2repositorycomjcraftjsch。1。42jsch0。1。42。jar;C:UsersAdministrator。m2repositorycommonsbeanutilscommonsbeanutilsbeancollections1。8。3commonsbeanutilsbeancollections1。8。3。jar;C:UsersAdministrator。m2repositorycommonsdaemoncommonsdaemon1。0。13commonsdaemon1。0。13。jar;C:UsersAdministrator。m2repositoryjavaxxmlbindjaxbapi2。2。2jaxbapi2。2。2。jar;C:UsersAdministrator。m2repositoryjavaxxmlstreamstaxapi1。02staxapi1。02。jar;C:UsersAdministrator。m2repositoryjavaxactivationactivation1。1activation1。1。jar;C:UsersAdministrator。m2repositoryioettyettyall4。0。27。Finalettyall4。0。27。Final。jar;C:UsersAdministrator。m2repositorycomdataartisansflakkaactor2。112。3customflakkaactor2。112。3custom。jar;C:UsersAdministrator。m2repositorycomdataartisansflakkaremote2。112。3customflakkaremote2。112。3custom。jar;C:UsersAdministrator。m2repositoryioettyetty3。8。0。Finaletty3。8。0。Final。jar;C:UsersAdministrator。m2repositoryorguncommonsmathsuncommonsmaths1。2。2auncommonsmaths1。2。2a。jar;C:UsersAdministrator。m2repositorycomdataartisansflakkaslf4j2。112。3customflakkaslf4j2。112。3custom。jar;C:UsersAdministrator。m2repositoryorgclappergrizzledslf4j2。111。0。2grizzledslf4j2。111。0。2。jar;C:UsersAdministrator。m2repositorycomgithubscoptscopt2。113。2。0scopt2。113。2。0。jar;C:UsersAdministrator。m2repositorycomfasterxmljacksoncorejacksoncore2。7。4jacksoncore2。7。4。jar;C:UsersAdministrator。m2repositorycomfasterxmljacksoncorejacksondatabind2。7。4jacksondatabind2。7。4。jar;C:UsersAdministrator。m2repositorycomfasterxmljacksoncorejacksonannotations2。7。0jacksonannotations2。7。0。jar;C:UsersAdministrator。m2repositoryorgapachezookeeperzookeeper3。4。6zookeeper3。4。6。jar;C:UsersAdministrator。m2repositoryjlinejline。9。94jline0。9。94。jar;C:UsersAdministrator。m2repositoryjunitjunit3。8。1junit3。8。1。jar;C:UsersAdministrator。m2repositorycomwitterchill2。11。7。4chill2。110。7。4。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkclients2。111。2。0flinkclients2。111。2。0。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkoptimizer2。111。2。0flinkoptimizer2。111。2。0。jar;C:UsersAdministrator。m2repositoryorgapacheslingorg。apache。sling。commons。json2。0。6org。apache。sling。commons。json2。0。6。jar;C:UsersAdministrator。m2repositorymysqlmysqlconnectorjava8。0。21mysqlconnectorjava8。0。21。jar;C:UsersAdministrator。m2repositorycomgoogleprotobufprotobufjava3。11。4protobufjava3。11。4。jar;C:UsersAdministrator。m2repositoryorgapacheflinkflinkconnectorjdbc2。121。13。0flinkconnectorjdbc2。121。13。0。jar;C:UsersAdministrator。m2repositoryorgslf4jslf4jlog4j121。7。30slf4jlog4j121。7。30。jar;C:UsersAdministrator。m2repositorylog4jlog4j1。2。17log4j1。2。17。jar;C:UsersAdministrator。m2repositoryorgapachelogginglog4jlog4jtoslf4j2。14。0log4jtoslf4j2。14。0。jar;C:UsersAdministrator。m2repositoryorgapachelogginglog4jlog4japi2。14。0log4japi2。14。0。jar;C:UsersAdministrator。m2repositoryredisclientsjedis2。9。0jedis2。9。0。jar;C:UsersAdministrator。m2repositoryorgapachecommonscommonspool22。4。2commonspool22。4。2。jar;C:UsersAdministrator。m2repositorycomgooglecodegsongson2。8。5gson2。8。5。jarday03。AvgByMapStatelog4j:WARNNoappenderscouldbefoundforlogger(org。apache。flink。api。java。ClosureCleaner)。log4j:WARNPleaseinitializethelog4jsystemproperly。log4j:WARNSeehttp:logging。apache。orglog4j1。2faq。htmlnoconfigformoreinfo。pvSum1,userNum1当前pv的平均值是:1。0pvSum2,userNum1当前pv的平均值是:2。0pvSum3,userNum2当前pv的平均值是:1。5pvSum4,userNum2当前pv的平均值是:2。0pvSum5,userNum3当前pv的平均值是:1。6666666666666667pvSum6,userNum3当前pv的平均值是:2。0pvSum7,userNum3当前pv的平均值是:2。3333333333333335pvSum8,userNum4当前pv的平均值是:2。0pvSum9,userNum4当前pv的平均值是:2。25pvSum10,userNum4当前pv的平均值是:2。5pvSum11,userNum4当前pv的平均值是:2。75pvSum12,userNum4当前pv的平均值是:3。0pvSum13,userNum4当前pv的平均值是:3。25pvSum14,userNum4当前pv的平均值是:3。5pvSum15,userNum4当前pv的平均值是:3。75pvSum16,userNum4当前pv的平均值是:4。0pvSum17,userNum4当前pv的平均值是:4。25pvSum18,userNum4当前pv的平均值是:4。5pvSum19,userNum4当前pv的平均值是:4。75pvSum20,userNum4当前pv的平均值是:5。0pvSum21,userNum4当前pv的平均值是:5。25pvSum22,userNum4当前pv的平均值是:5。5pvSum23,userNum4当前pv的平均值是:5。75pvSum24,userNum4当前pv的平均值是:6。0pvSum25,userNum4当前pv的平均值是:6。25pvSum26,userNum4当前pv的平均值是:6。5pvSum27,userNum4当前pv的平均值是:6。75pvSum28,userNum4当前pv的平均值是:7。0pvSum29,userNum4当前pv的平均值是:7。25pvSum30,userNum4当前pv的平均值是:7。5pvSum31,userNum4当前pv的平均值是:7。75pvSum32,userNum4当前pv的平均值是:8。0pvSum33,userNum4当前pv的平均值是:8。25pvSum34,userNum4当前pv的平均值是:8。5pvSum35,userNum4当前pv的平均值是:8。75pvSum36,userNum4当前pv的平均值是:9。0