Scala版Flink程序编写packagelearnimportorg。apache。flink。api。java。utils。ParameterToolimportorg。apache。flink。streaming。api。scala。{DataStream,StreamExecutionEnvironment}importorg。apache。flink。streaming。api。windowing。time。Timeprogram:bigdatalearndescription:{description}author:Mr。逗create:2021090810:04objectStreamingJob{defmain(args:Array〔String〕):Unit{获取socket端口号valport:Inttry{ParameterTool。fromArgs(args)。getInt(port)}catch{casee:Exception{System。err。println(Noportset。Usedefaultport9000!)}9000}获取运行环境valenv:StreamExecutionEnvironmentStreamExecutionEnvironment。getExecutionEnvironment链接socket获取输入数据valtextenv。socketTextStream(172。17。0。50,9000,)importorg。apache。flink。api。scala。解析数据(数据扁平化),分组,窗口计算,并且聚合求sumvalwordWithCounttext。flatMap(lineline。split(,))。map(wWordWithCount(w,1))。keyBy(word)分组。timeWindow(Time。seconds(2),Time。seconds(1))指定窗口大小,间隔时间。sum(count);sum或者reduce都行。reduce((a,b)WordWithCount(a。word,a。countb。count));wordWithCount。print()。setParallelism(1);打印到控制台env。execute(SocketWindowCount!)}caseclassWordWithCount(word:String,count:Long)} 打开一个终端(Terminal),运行以下命令nclk9999 接下来使用IDEA运行就可以了。Java版Flink程序编写importorg。apache。flink。api。common。functions。FlatMapFunction;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。streaming。api。datastream。DataStream;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。util。Collector;publicclassWordCountFromSocket{publicstaticvoidmain(String〔〕args)throwsException{finalStreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。setParallelism(1);DataStreamStringstreamenv。socketTextStream(localhost,9999);stream。flatMap(newTokenizer())。keyBy(rr。f0)。sum(1)。print();env。execute(FlinkStreamingJavaAPISkeleton);}publicstaticclassTokenizerimplementsFlatMapFunctionString,Tuple2String,Integer{OverridepublicvoidflatMap(Stringvalue,CollectorTuple2String,Integerout)throwsException{String〔〕stringListvalue。split(s);for(Strings:stringList){使用out。collect方法向下游发送数据out。collect(newTuple2(s,1));}}}}2下载Flink运行时环境,提交Jar包的运行方式 下载链接:http:mirror。bit。edu。cnapacheflinkflink1。11。1flink1。11。1binscala2。11。tgz 然后解压tarxvfzflink1。11。1binscala2。11。tgz 启动Flink集群cdflink1。11。1。binstartcluster。sh 可以打开FlinkWebUI查看集群状态:http:localhost:8081 在IDEA中使用mavenpackage打包。 提交打包好的JAR包cdflink1。11。1。binflinkrun打包好的JAR包的绝对路径 停止Flink集群。binstopcluster。sh 查看标准输出日志的位置,在log文件夹中。cdflink1。11。1log