关键代码WindowedStreamString,Integer,TimeWindowoutputLateDatasource。keyBy(v1)。window(TumblingEventTimeWindows。of(Time。seconds(5)))。sideOutputLateData(newOutputTagString(late){});SingleOutputStreamOperatorStringresultoutputLateData。process(newProcessWindowFunctionString,String,Integer,TimeWindow(){Overridepublicvoidprocess(Integerinteger,Contextcontext,IterableStringelements,CollectorStringout)throwsException{out。collect(窗口中共有:elements。spliterator()。getExactSizeIfKnown());}});result。print();完整代码packageday05;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;importorg。apache。flink。streaming。api。datastream。WindowedStream;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。api。functions。source。SourceFunction;importorg。apache。flink。streaming。api。functions。windowing。ProcessWindowFunction;importorg。apache。flink。streaming。api。watermark。Watermark;importorg。apache。flink。streaming。api。windowing。assigners。TumblingEventTimeWindows;importorg。apache。flink。streaming。api。windowing。time。Time;importorg。apache。flink。streaming。api。windowing。windows。TimeWindow;importorg。apache。flink。util。Collector;importorg。apache。flink。util。OutputTag;program:bigDatalearndescription:侧输出标签是单例模式author:Mr。逗create:2021092414:21publicclassSingleBySideOutput{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。setParallelism(1);DataStreamSourceStringsourceenv。addSource(newSourceFunctionString(){Overridepublicvoidrun(SourceContextStringctx)throwsException{ctx。collectWithTimestamp(a,1000L);ctx。emitWatermark(newWatermark(999L));ctx。collectWithTimestamp(a,2000L);ctx。emitWatermark(newWatermark(1999L));ctx。collectWithTimestamp(a,4000L);ctx。emitWatermark(newWatermark(4999L));ctx。collectWithTimestamp(a,3000L);}Overridepublicvoidcancel(){}});WindowedStreamString,Integer,TimeWindowoutputLateDatasource。keyBy(v1)。window(TumblingEventTimeWindows。of(Time。seconds(5)))。sideOutputLateData(newOutputTagString(late){});SingleOutputStreamOperatorStringresultoutputLateData。process(newProcessWindowFunctionString,String,Integer,TimeWindow(){Overridepublicvoidprocess(Integerinteger,Contextcontext,IterableStringelements,CollectorStringout)throwsException{out。collect(窗口中共有:elements。spliterator()。getExactSizeIfKnown());}});result。print();result。getSideOutput(newOutputTagString(late){})。print(late:);StringnameSingleBySideOutput。class。getName();try{env。execute(name);}catch(Exceptione){e。printStackTrace();}}}结果展示C:ProgramFilesJavajdk1。8。0191binjava。exejavaagent:F:appIntelliJIDEA2019。3。3libideart。jar63387:F:appIntelliJIDEA2019。3。3binDfile。encodingUTF8classpathC:ProgramFilesJavajdk1。8。0191jrelibcharsets。jar;C:ProgramFilesJavajdk1。8。0191jrelibdeploy。jar;C:ProgramFilesJavajdk1。8。0191jrelibextaccessbridge64。jar;C:ProgramFilesJavajdk1。8。0191jrelibextcldrdata。jar;C:ProgramFilesJavajdk1。8。0191jrelibextdnsns。jar;C:ProgramFilesJavajdk1。8。0191jrelibextjaccess。jar;C:ProgramFilesJavajdk1。8。0191jrelibextjfxrt。jar;C:ProgramFilesJavajdk1。8。0191jrelibextlocaledata。jar;C:ProgramFilesJavajdk1。8。0191jrelibextashorn。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunec。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunjceprovider。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunmscapi。jar;C:ProgramFilesJavajdk1。8。0191jrelibextsunpkcs11。jar;C:ProgramFilesJavajdk1。8。0191jrelibextzipfs。jar;C:ProgramFilesJavajdk1。8。0191jrelibjavaws。jar;C:ProgramFilesJavajdk1。8。0191jrelibjce。jar;C:ProgramFilesJavajdk1。8。0191jrelibjfr。jar;C:ProgramFilesJavajdk1。8。0191jrelibjfxswt。jar;C:ProgramFilesJavajdk1。8。0191jrelibjsse。jar;C:ProgramFilesJavajdk1。8。0191jrelibmanagementagent。jar;C:ProgramFilesJavajdk1。8。0191jrelibplugin。jar;C:ProgramFilesJavajdk1。8。0191jrelibresources。jar;C:ProgramFilesJavajdk1。8。0191jrelibrt。jar;G:bigDatalearnFlinklearnargetclasses;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkjava1。13。0flinkjava1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkcore1。13。0flinkcore1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkannotations1。13。0flinkannotations1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkmetricscore1。13。0flinkmetricscore1。13。0。jar;G:appapachemaven3。6。3mavenrepositorycomesotericsoftwarekryokryo2。24。0kryo2。24。0。jar;G:appapachemaven3。6。3mavenrepositorycomesotericsoftwareminlogminlog1。2minlog1。2。jar;G:appapachemaven3。6。3mavenrepositoryorgobjenesisobjenesis2。1objenesis2。1。jar;G:appapachemaven3。6。3mavenrepositorycommonscollectionscommonscollections3。2。2commonscollections3。2。2。jar;G:appapachemaven3。6。3mavenrepositoryorgapachecommonscommonscompress1。20commonscompress1。20。jar;G:appapachemaven3。6。3mavenrepositoryorgapachecommonscommonslang33。3。2commonslang33。3。2。jar;G:appapachemaven3。6。3mavenrepositoryorgapachecommonscommonsmath33。5commonsmath33。5。jar;G:appapachemaven3。6。3mavenrepositoryorgslf4jslf4japi1。7。15slf4japi1。7。15。jar;G:appapachemaven3。6。3mavenrepositorycomgooglecodefindbugsjsr3051。3。9jsr3051。3。9。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkforceshading1。13。0forceshading1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkstreamingjava2。121。13。0flinkstreamingjava2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkfilesinkcommon1。13。0flinkfilesinkcommon1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkruntime2。121。13。0flinkruntime2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkqueryablestateclientjava1。13。0flinkqueryablestateclientjava1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkhadoopfs1。13。0flinkhadoopfs1。13。0。jar;G:appapachemaven3。6。3mavenrepositorycommonsiocommonsio2。7commonsio2。7。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkshadednetty4。1。49。Final13。0flinkshadednetty4。1。49。Final13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkshadedjackson2。12。113。0flinkshadedjackson2。12。113。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkshadedzookeeper33。4。1413。0flinkshadedzookeeper33。4。1413。0。jar;G:appapachemaven3。6。3mavenrepositoryorgjavassistjavassist3。24。0GAjavassist3。24。0GA。jar;G:appapachemaven3。6。3mavenrepositorycomypesafeakkaakkaactor2。122。5。21akkaactor2。122。5。21。jar;G:appapachemaven3。6。3mavenrepositorycomypesafeconfig1。3。3config1。3。3。jar;G:appapachemaven3。6。3mavenrepositoryorgscalalangmodulesscalajava8compat2。12。8。0scalajava8compat2。120。8。0。jar;G:appapachemaven3。6。3mavenrepositorycomypesafeakkaakkastream2。122。5。21akkastream2。122。5。21。jar;G:appapachemaven3。6。3mavenrepositoryorgreactivestreamsreactivestreams1。0。2reactivestreams1。0。2。jar;G:appapachemaven3。6。3mavenrepositorycomypesafesslconfigcore2。12。3。7sslconfigcore2。120。3。7。jar;G:appapachemaven3。6。3mavenrepositoryorgscalalangmodulesscalaparsercombinators2。121。1。1scalaparsercombinators2。121。1。1。jar;G:appapachemaven3。6。3mavenrepositorycomypesafeakkaakkaprotobuf2。122。5。21akkaprotobuf2。122。5。21。jar;G:appapachemaven3。6。3mavenrepositorycomypesafeakkaakkaslf4j2。122。5。21akkaslf4j2。122。5。21。jar;G:appapachemaven3。6。3mavenrepositoryorgclappergrizzledslf4j2。121。3。2grizzledslf4j2。121。3。2。jar;G:appapachemaven3。6。3mavenrepositorycomgithubscoptscopt2。123。5。0scopt2。123。5。0。jar;G:appapachemaven3。6。3mavenrepositoryorgxerialsnappysnappyjava1。1。8。3snappyjava1。1。8。3。jar;G:appapachemaven3。6。3mavenrepositorycomwitterchill2。12。7。6chill2。120。7。6。jar;G:appapachemaven3。6。3mavenrepositorycomwitterchilljava。7。6chilljava0。7。6。jar;G:appapachemaven3。6。3mavenrepositoryorglz4lz4java1。6。0lz4java1。6。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkshadedguava18。013。0flinkshadedguava18。013。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkclients2。121。13。0flinkclients2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkoptimizer2。121。13。0flinkoptimizer2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositorycommonsclicommonscli1。3。1commonscli1。3。1。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktableapijavabridge2。121。13。0flinktableapijavabridge2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktableapijava1。13。0flinktableapijava1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktableplannerblink2。121。13。0flinktableplannerblink2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktableapiscala2。121。13。0flinktableapiscala2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktableapiscalabridge2。121。13。0flinktableapiscalabridge2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktableruntimeblink2。121。13。0flinktableruntimeblink2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgcodehausjaninojanino3。0。11janino3。0。11。jar;G:appapachemaven3。6。3mavenrepositoryorgcodehausjaninocommonscompiler3。0。11commonscompiler3。0。11。jar;G:appapachemaven3。6。3mavenrepositoryorgapachecalciteavaticaavaticacore1。17。0avaticacore1。17。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkstreamingscala2。121。13。0flinkstreamingscala2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkscala2。121。13。0flinkscala2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgscalalangscalareflect2。12。7scalareflect2。12。7。jar;G:appapachemaven3。6。3mavenrepositoryorgscalalangscalalibrary2。12。7scalalibrary2。12。7。jar;G:appapachemaven3。6。3mavenrepositoryorgscalalangscalacompiler2。12。7scalacompiler2。12。7。jar;G:appapachemaven3。6。3mavenrepositoryorgscalalangmodulesscalaxml2。121。0。6scalaxml2。121。0。6。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinktablecommon1。13。0flinktablecommon1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkconnectorfiles1。13。0flinkconnectorfiles1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkshadedasm77。113。0flinkshadedasm77。113。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkcep2。121。13。0flinkcep2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkcsv1。13。0flinkcsv1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkconnectorkafka2。121。13。0flinkconnectorkafka2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapachekafkakafkaclients2。4。1kafkaclients2。4。1。jar;G:appapachemaven3。6。3mavenrepositorycomgithublubenzstdjni1。4。31zstdjni1。4。31。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkconnectorbase1。13。0flinkconnectorbase1。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapachebahirflinkconnectorredis2。111。0flinkconnectorredis2。111。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkstreamingjava2。111。2。0flinkstreamingjava2。111。2。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkruntime2。111。2。0flinkruntime2。111。2。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkshadedhadoop21。2。0flinkshadedhadoop21。2。0。jar;G:appapachemaven3。6。3mavenrepositoryorgukaanixz1。0xz1。0。jar;G:appapachemaven3。6。3mavenrepositoryxmlencxmlenc。52xmlenc0。52。jar;G:appapachemaven3。6。3mavenrepositorycommonscodeccommonscodec1。4commonscodec1。4。jar;G:appapachemaven3。6。3mavenrepositorycommonsnetcommonsnet3。1commonsnet3。1。jar;G:appapachemaven3。6。3mavenrepositoryjavaxservletservletapi2。5servletapi2。5。jar;G:appapachemaven3。6。3mavenrepositoryorgmortbayjettyjettyutil6。1。26jettyutil6。1。26。jar;G:appapachemaven3。6。3mavenrepositorycomsunjerseyjerseycore1。9jerseycore1。9。jar;G:appapachemaven3。6。3mavenrepositorycommonselcommonsel1。0commonsel1。0。jar;G:appapachemaven3。6。3mavenrepositorycommonsloggingcommonslogging1。1。3commonslogging1。1。3。jar;G:appapachemaven3。6。3mavenrepositorycomjamesmurtyutilsjavaxmlbuilder。4javaxmlbuilder0。4。jar;G:appapachemaven3。6。3mavenrepositorycommonslangcommonslang2。6commonslang2。6。jar;G:appapachemaven3。6。3mavenrepositorycommonsconfigurationcommonsconfiguration1。7commonsconfiguration1。7。jar;G:appapachemaven3。6。3mavenrepositorycommonsdigestercommonsdigester1。8。1commonsdigester1。8。1。jar;G:appapachemaven3。6。3mavenrepositoryorgcodehausjacksonjacksoncoreasl1。8。8jacksoncoreasl1。8。8。jar;G:appapachemaven3。6。3mavenrepositoryorgcodehausjacksonjacksonmapperasl1。8。8jacksonmapperasl1。8。8。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheavroavro1。7。7avro1。7。7。jar;G:appapachemaven3。6。3mavenrepositorycomhoughtworksparanamerparanamer2。3paranamer2。3。jar;G:appapachemaven3。6。3mavenrepositorycomjcraftjsch。1。42jsch0。1。42。jar;G:appapachemaven3。6。3mavenrepositorycommonsbeanutilscommonsbeanutilsbeancollections1。8。3commonsbeanutilsbeancollections1。8。3。jar;G:appapachemaven3。6。3mavenrepositorycommonsdaemoncommonsdaemon1。0。13commonsdaemon1。0。13。jar;G:appapachemaven3。6。3mavenrepositoryjavaxxmlbindjaxbapi2。2。2jaxbapi2。2。2。jar;G:appapachemaven3。6。3mavenrepositoryjavaxxmlstreamstaxapi1。02staxapi1。02。jar;G:appapachemaven3。6。3mavenrepositoryjavaxactivationactivation1。1activation1。1。jar;G:appapachemaven3。6。3mavenrepositoryioettyettyall4。0。27。Finalettyall4。0。27。Final。jar;G:appapachemaven3。6。3mavenrepositorycomdataartisansflakkaactor2。112。3customflakkaactor2。112。3custom。jar;G:appapachemaven3。6。3mavenrepositorycomdataartisansflakkaremote2。112。3customflakkaremote2。112。3custom。jar;G:appapachemaven3。6。3mavenrepositoryioettyetty3。8。0。Finaletty3。8。0。Final。jar;G:appapachemaven3。6。3mavenrepositoryorguncommonsmathsuncommonsmaths1。2。2auncommonsmaths1。2。2a。jar;G:appapachemaven3。6。3mavenrepositorycomdataartisansflakkaslf4j2。112。3customflakkaslf4j2。112。3custom。jar;G:appapachemaven3。6。3mavenrepositoryorgclappergrizzledslf4j2。111。0。2grizzledslf4j2。111。0。2。jar;G:appapachemaven3。6。3mavenrepositorycomgithubscoptscopt2。113。2。0scopt2。113。2。0。jar;G:appapachemaven3。6。3mavenrepositorycomfasterxmljacksoncorejacksoncore2。7。4jacksoncore2。7。4。jar;G:appapachemaven3。6。3mavenrepositorycomfasterxmljacksoncorejacksondatabind2。7。4jacksondatabind2。7。4。jar;G:appapachemaven3。6。3mavenrepositorycomfasterxmljacksoncorejacksonannotations2。7。0jacksonannotations2。7。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapachezookeeperzookeeper3。4。6zookeeper3。4。6。jar;G:appapachemaven3。6。3mavenrepositoryjlinejline。9。94jline0。9。94。jar;G:appapachemaven3。6。3mavenrepositoryjunitjunit3。8。1junit3。8。1。jar;G:appapachemaven3。6。3mavenrepositorycomwitterchill2。11。7。4chill2。110。7。4。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkclients2。111。2。0flinkclients2。111。2。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkoptimizer2。111。2。0flinkoptimizer2。111。2。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheslingorg。apache。sling。commons。json2。0。6org。apache。sling。commons。json2。0。6。jar;G:appapachemaven3。6。3mavenrepositorymysqlmysqlconnectorjava8。0。21mysqlconnectorjava8。0。21。jar;G:appapachemaven3。6。3mavenrepositorycomgoogleprotobufprotobufjava3。11。4protobufjava3。11。4。jar;G:appapachemaven3。6。3mavenrepositoryorgapacheflinkflinkconnectorjdbc2。121。13。0flinkconnectorjdbc2。121。13。0。jar;G:appapachemaven3。6。3mavenrepositoryorgslf4jslf4jlog4j121。7。30slf4jlog4j121。7。30。jar;G:appapachemaven3。6。3mavenrepositorylog4jlog4j1。2。17log4j1。2。17。jar;G:appapachemaven3。6。3mavenrepositoryorgapachelogginglog4jlog4jtoslf4j2。14。0log4jtoslf4j2。14。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapachelogginglog4jlog4japi2。14。0log4japi2。14。0。jar;G:appapachemaven3。6。3mavenrepositoryredisclientsjedis2。9。0jedis2。9。0。jar;G:appapachemaven3。6。3mavenrepositoryorgapachecommonscommonspool22。4。2commonspool22。4。2。jar;G:appapachemaven3。6。3mavenrepositorycomgooglecodegsongson2。8。5gson2。8。5。jarday05。SingleBySideOutputlog4j:WARNNoappenderscouldbefoundforlogger(org。apache。flink。api。java。ClosureCleaner)。log4j:WARNPleaseinitializethelog4jsystemproperly。log4j:WARNSeehttp:logging。apache。orglog4j1。2faq。htmlnoconfigformoreinfo。窗口中共有:3late:aProcessfinishedwithexitcode0