Flink操练(四十一)之周期水位线窗口
关键代码env。getConfig()。setAutoWatermarkInterval(601000L);完整代码packageday04;importorg。apache。flink。api。common。eventtime。SerializableTimestampAssigner;importorg。apache。flink。api。common。eventtime。WatermarkStrategy;importorg。apache。flink。api。common。functions。MapFunction;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。streaming。api。datastream。DataStream;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。api。functions。windowing。ProcessWindowFunction;importorg。apache。flink。streaming。api。windowing。assigners。TumblingEventTimeWindows;importorg。apache。flink。streaming。api。windowing。time。Time;importorg。apache。flink。streaming。api。windowing。windows。TimeWindow;importorg。apache。flink。util。Collector;importjava。sql。Timestamp;importjava。time。Duration;program:bigDatalearndescription:水位线测试author:Mr。逗create:2021092409:30publicclassWaterTest2{publicstaticvoidmain(String〔〕args){StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。setParallelism(1);每隔1分钟插入一次水位线env。getConfig()。setAutoWatermarkInterval(601000L);DataStreamSourceStringsourceenv。socketTextStream(172。17。0。50,9999);DataStreamTuple2String,Longmapsource。map(newMapFunctionString,Tuple2String,Long(){OverridepublicTuple2String,Longmap(Stringv)throwsException{returnTuple2。of(v。split()〔0〕,Long。parseLong(v。split()〔1〕)1000L);}});默认每隔200ms的机器时间,插入一次水位线DataStreamTuple2String,Longwatermarksmap。assignTimestampsAndWatermarks(WatermarkStrategy。Tuple2String,LongforBoundedOutOfOrderness(Duration。ofSeconds(0))。withTimestampAssigner(newSerializableTimestampAssignerTuple2String,Long(){OverridepubliclongextractTimestamp(Tuple2String,Longv,longl){returnv。f1;告诉flink事件时间是哪一个字段}}));DataStreamStringprocesswatermarks。keyBy(vv。f0)5秒的事件时间滚动窗口。window(TumblingEventTimeWindows。of(Time。seconds(5)))。process(newProcessWindowFunctionTuple2String,Long,String,String,TimeWindow(){Overridepublicvoidprocess(Stringkey,Contextctx,IterableTuple2String,Longit,CollectorStringout)throwsException{longstartctx。window()。getStart();longendctx。window()。getEnd();longcountit。spliterator()。getExactSizeIfKnown();迭代器里面共有多少元素out。collect(用户:key在窗口newTimestamp(start)newTimestamp(end)中的pv次数是:count);}});process。print();StringnameWaterTest2。class。getName();try{env。execute(name);}catch(Exceptione){e。printStackTrace();}}}
Hadoop时代落幕,谁是大数据时代新宠?随着2022年3月这个Cloudera宣布停止对CDH技术支持日子越来越近,那些已经部署CDH和其他版本Hadoop的企业面临一个迫切的问题:自己原来部署的Hadoop怎么办?……
防骗移动电信联通运营商限制境外骚扰电话方法近期频繁接收到网络电话诈骗及境外电话,小编整理拦截境外来电骚扰的方法分享给大家,国内三大运营商用户均适用。亲测有效!!!移动用户:1。编辑短信KTFSR到10086,免费……
最低售价999元!华为三款新品同时震撼发布频繁活跃在各大社交平台的朋友们应该对于华为和苹果的新品发布会十分感兴趣!毕竟从全球范围来看,华为和苹果已经拥有随时一决高下的强悍实力,苹果的新品发布会已经成功召开了,大众等了很……
刘强东和王微控制权的正反两个典型土豆网的王微在控制权方面,他还是不太注意的一个人,土豆网上线8个月的时候,就获得IDG的A轮。A轮只花了50万美金,这简直就是毛毛雨了。但是换了土豆网多少股权?30当总编……
Laravelartisan命令中文解释phpartisanV获取laravel当前版本号phpartisanclearcompiled删除编译后的类文件phpartisancompletion转储shell完成脚本……
亚马逊KindlePaperwhite跌至71美元目前的KindlePaperwhite电子阅读器在亚马逊上已降至71万美元,比起130美元的标价低59美元。它比以前的版本更薄、更轻10。它采用齐平正面设计,背面由……
从X到13,苹果手机都进行了哪些提升和改变随着2021年9月苹果凌晨发布会的结束,新一波果粉买买买热潮或许会来的更凶猛一些。曾经深度体验过iPhoneX(2017年9月13日发布),转眼间已经进行了3波升级换代。作为安……
红米K50Pro首发天玑2000,5100mAh80W,有1今年联发科只发布了6nm芯片,具体为天玑1100和1200,在市场上并没有一款像样的5nm芯片面世。因此,在对比高通5nm芯片骁龙888的时候,确实处于弱势,在性能上人们也普遍……
久违了!iPadmini6正式发布配全面屏支持TouchID如果iPadmini6缺席本次苹果秋季发布会,不知道有多少果粉要捶胸顿足了,但iPadmini6没有缺席。它不仅没有缺席,还带来了全新面貌。iPadmini6屏幕方……
变了芯的华为路由AX3Pro你还要吗?AX3Pro目前存在新老版本混发的状况,商品详情页并没有注明差异点,差异在CPU:新版高通双核1GHz,老版凌霄四核1。4GHz,真不知道厂家怎么想的?客服还说不会影响产品整体……
荣耀有没有可能抢回丢掉的市场?我是靓小伟,很高兴回答你的问题!我觉得荣耀还是有机会提高市场占有率的。因为荣耀这个品牌形象还是很不错的。不过,光有好的品牌是不够的,希望荣耀可以把自己的手机定……
一套防沉迷系统引发的血案8月30日晚,国家新闻出版署下发通知,要求所有网络游戏企业仅可在周五、周六、周日和法定节假日每日20时至21时向未成年人提供1小时服务。这意味着,正常情况下,未成年玩家每周游戏……