1、代码逻辑实现packageday02;importorg。apache。flink。api。common。functions。FilterFimportorg。apache。flink。api。common。functions。FlatMapFimportorg。apache。flink。streaming。api。datastream。DataStreamSimportorg。apache。flink。streaming。api。environment。StreamExecutionEimportorg。apache。flink。streaming。api。functions。source。SourceFimportorg。apache。flink。util。Cimportjava。util。Cimportjava。util。Rprogram:Flinklearndescription:自定义filterauthor:Mr。逗create:2021091710:04publicclassSelfFilter{1、事件PrJopublicstaticclassEvent{publicSpublicSpublicLpublicEvent(){}publicEvent(Stringuser,Stringurl,Longtimestamp){this。this。this。}OverridepublicStringtoString(){returnEvent{useruser,urlurl,timestamptimestamp};}}2、自定义数据源publicstaticclassClickSourceimplementsSourceFunctionEvent{privateString〔〕userArr{Mary,Bob,Alice,Liz};privateString〔〕urlArr{。home,。cart,。fav,。prod?id1,。prod?id2};privateRandomrandomnewRandom();Overridepublicvoidrun(SourceContextEventctx)throwsException{while(running){ctx。collect(newEvent(userArr〔random。nextInt(userArr。length)〕,urlArr〔random。nextInt(urlArr。length)〕,Calendar。getInstance()。getTimeInMillis()));Thread。sleep(1000);}}Overridepublicvoidcancel(){}}3、自定义filterpublicstaticclassMyFilterimplementsFilterFunctionEvent{Overridepublicbooleanfilter(Eventv)throwsException{returnv。user。equals(Mary);}}publicstaticvoidmain(String〔〕args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();DataStreamSourceEventstreamSourceenv。addSource(newClickSource());1、filter(rr。user。equals(Mary))streamSource。filter(vv。user。equals(Mary))。print();2、FilterFunctionstreamSource。filter(newFilterFunctionEvent(){Overridepublicbooleanfilter(Eventv)throwsException{returnv。user。equals(Mary);}})。print();3、newMyFilterstreamSource。filter(newMyFilter())。print();4、flatMapstreamSource。flatMap(newFlatMapFunctionEvent,Event(){OverridepublicvoidflatMap(Eventevent,CollectorEventout)throwsException{if(event。user。equals(Mary))out。collect(event);}})。print();StringnameSelfFilter。class。getName();env。execute(name);}}2、结果之展示5Event{userMary,url。home,timestamp1631933148157}5Event{userMary,url。home,timestamp1631933148157}8Event{userMary,url。home,timestamp1631933148157}2Event{userMary,url。home,timestamp1631933148157}
造句:操练二十五之自定义
造句:操练二十五之自定义