一、概述 Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。 Flink官网:https:flink。apache。org 不同版本的文档:https:nightlies。apache。orgflink k8sonflink官方文档:https:nightlies。apache。orgflinkflinkdocsrelease1。14docsdeploymentresourceprovidersnativekubernetes 也可以参考我之前的文章:大数据Hadoop之实时计算流计算引擎Flink(Flink环境部署) GitHub地址:https:github。comapacheflinktreerelease1。14。6二、Flink运行模式 官方文档:https:nightlies。apache。orgflinkflinkdocsrelease1。15zhdocsdeploymentoverview FLinkonyarn有三种运行模式:yarnsession模式(SeesionMode)yarncluster模式(PerJobMode)Application模式(ApplicationMode) 【温馨提示】PerJob模式(已弃用),Perjob模式仅由YARN支持,并已在Flink1。15中弃用。它将被丢弃在FLINK26000中。三、Flinkonk8s实战操作 1)flink下载 下载地址:https:flink。apache。orgdownloads。htmlwgethttps:dlcdn。apache。orgflinkflink1。14。6flink1。14。6binscala2。12。tgz2)构建基础镜像dockerpullapacheflink:1。14。6scala2。12dockertagapacheflink:1。14。6scala2。12myharbor。combigdataflink:1。14。6scala2。12dockerpushmyharbor。combigdataflink:1。14。6scala2。123)session模式 FlinkSession集群作为长时间运行的KubernetesDeployment执行。你可以在一个Session集群上运行多个Flink作业。每个作业都需要在集群部署完成后提交到集群。 Kubernetes中的FlinkSession集群部署至少包含三个组件:运行JobManager的部署TaskManagers池的部署暴露JobManager的REST和UI端口的服务1、NativeKubernetes模式 参数配置: https:nightlies。apache。orgflinkflinkdocsrelease1。14docsdeploymentconfigkubernetesnamespace【1】构建镜像DockerfileFROMmyharbor。combigdataflink:1。14。6scala2。12RUNrmfetclocaltimelnsvusrsharezoneinfoAsiaShanghaietclocaltimeechoAsiaShanghaietctimezoneRUNexportLANGzhCN。UTF8 开始构建镜像dockerbuildtmyharbor。combigdataflinksession:1。14。6scala2。12。nocache上传镜像dockerpushmyharbor。combigdataflinksession:1。14。6scala2。12【2】创建命名空间和serviceaccount创建namespacekubectlcreatensflink创建serviceaccountkubectlcreateserviceaccountflinkserviceaccountnflink用户授权kubectlcreateclusterrolebindingflinkrolebindingflinkclusterroleeditserviceaccountflink:flinkserviceaccount【3】创建flink集群。binkubernetessession。shDkubernetes。clusteridmyfirstflinkclusterDkubernetes。container。imagemyharbor。combigdataflinksession:1。14。6scala2。12Dkubernetes。namespaceflinkDkubernetes。jobmanager。serviceaccountflinkserviceaccountDkubernetes。restservice。exposed。typeNodePort 【4】提交任务。binflinkruntargetkubernetessessionDkubernetes。clusteridmyfirstflinkclusterDkubernetes。namespaceflinkDkubernetes。jobmanager。serviceaccountflinkserviceaccount。examplesstreamingTopSpeedWindowing。jar参数配置。examplesstreamingWordCount。jarDkubernetes。taskmanager。cpu2000mDexternalresource。limits。kubernetes。cpu4000mDexternalresource。limits。kubernetes。memory10GiDexternalresource。requests。kubernetes。cpu2000mDexternalresource。requests。kubernetes。memory8GiDkubernetes。taskmanager。cpu2000m 【温馨提示】注意jdk版本,目前jdk8是正常的。 【5】查看kubectlgetpodsnflinkkubectllogsfmyfirstflinkclustertaskmanager11 【6】删除flink集群kubectldeletedeploymentmyfirstflinkclusternflinkkubectldeletensflinkforce2、Standalone模式【1】构建镜像 默认用户是flink用户,这里我换成admin,根据企业需要更换用户,脚本可以通过上面运行的pod拿到。 启动脚本dockerentrypoint。sh!usrbinenvbashLicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements。SeetheNOTICEfiledistributedwiththisworkforadditionalinformationregardingcopyrightownership。TheASFlicensesthisfiletoyouundertheApacheLicense,Version2。0(theLicense);youmaynotusethisfileexceptincompliancewiththeLicense。YoumayobtainacopyoftheLicenseathttp:www。apache。orglicensesLICENSE2。0Unlessrequiredbyapplicablelaworagreedtoinwriting,softwaredistributedundertheLicenseisdistributedonanASISBASIS,WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied。SeetheLicenseforthespecificlanguagegoverningpermissionsandlimitationsundertheLicense。COMMANDSTANDALONEstandalonejobCOMMANDHISTORYSERVERhistoryserverIfunspecified,thehostnameofthecontaineristakenastheJobManageraddressJOBMANAGERRPCADDRESS{JOBMANAGERRPCADDRESS:(hostnamef)}CONFFILE{FLINKHOME}confflinkconf。yamldropprivscmd(){if〔(idu)!0〕;thenDontneedtodropprivsifEUID!0returnelif〔xsbinsuexec〕;thenAlpineechosuexecadminelseOthersechogosuadminfi}copypluginsifrequired(){if〔zENABLEBUILTINPLUGINS〕;thenreturn0fiechoEnablingrequiredbuiltinpluginsfortargetpluginin(echoENABLEBUILTINPLUGINS);doechoLinking{targetplugin}toplugindirectorypluginname{targetplugin。jar}mkdirp{FLINKHOME}plugins{pluginname}if〔!e{FLINKHOME}opt{targetplugin}〕;thenechoPlugin{targetplugin}doesnotexist。Exiting。exit1elselnfs{FLINKHOME}opt{targetplugin}{FLINKHOME}plugins{pluginname}echoSuccessfullyenabled{targetplugin}fidone}setconfigoption(){localoption1localvalue2escapeperiodsforusageinregularexpressionslocalescapedoption(echo{option}sedes。。g)eitheroverrideanexistingentry,orappendanewoneifgrepE{escapedoption}:。{CONFFILE}thensedies{escapedoption}:。option:valueg{CONFFILE}elseecho{option}:{value}{CONFFILE}fi}prepareconfiguration(){setconfigoptionjobmanager。rpc。address{JOBMANAGERRPCADDRESS}setconfigoptionblob。server。port6124setconfigoptionquery。server。port6125if〔n{TASKMANAGERNUMBEROFTASKSLOTS}〕;thensetconfigoptiontaskmanager。numberOfTaskSlots{TASKMANAGERNUMBEROFTASKSLOTS}fiif〔n{FLINKPROPERTIES}〕;thenecho{FLINKPROPERTIES}{CONFFILE}fienvsubst{CONFFILE}{CONFFILE}。tmpmv{CONFFILE}。tmp{CONFFILE}}maybeenablejemalloc(){if〔{DISABLEJEMALLOC:false}false〕;thenJEMALLOCPATHusrlib(unamem)linuxgnulibjemalloc。soJEMALLOCFALLBACKusrlibx8664linuxgnulibjemalloc。soif〔fJEMALLOCPATH〕;thenexportLDPRELOADLDPRELOAD:JEMALLOCPATHelif〔fJEMALLOCFALLBACK〕;thenexportLDPRELOADLDPRELOAD:JEMALLOCFALLBACKelseif〔JEMALLOCPATHJEMALLOCFALLBACK〕;thenMSGPATHJEMALLOCPATHelseMSGPATHJEMALLOCPATHandJEMALLOCFALLBACKfiechoWARNING:attemptedtoloadjemallocfromMSGPATHbutthelibrarycouldntbefound。glibcwillbeusedinstead。fifi}maybeenablejemalloccopypluginsifrequiredprepareconfigurationargs()if〔1help〕;thenprintfUsage:(basename0)(jobmanager{COMMANDSTANDALONE}taskmanager{COMMANDHISTORYSERVER})printfOr(basename0)helpprintfBydefault,Flinkimageadoptsjemallocasdefaultmemoryallocator。ThisbehaviorcanbedisabledbysettingtheDISABLEJEMALLOCenvironmentvariabletotrue。exit0elif〔1jobmanager〕;thenargs({args〔〕:1})echoStartingJobManagerexec(dropprivscmd)FLINKHOMEbinjobmanager。shstartforeground{args〔〕}elif〔1{COMMANDSTANDALONE}〕;thenargs({args〔〕:1})echoStartingJobManagerexec(dropprivscmd)FLINKHOMEbinstandalonejob。shstartforeground{args〔〕}elif〔1{COMMANDHISTORYSERVER}〕;thenargs({args〔〕:1})echoStartingHistoryServerexec(dropprivscmd)FLINKHOMEbinhistoryserver。shstartforeground{args〔〕}elif〔1taskmanager〕;thenargs({args〔〕:1})echoStartingTaskManagerexec(dropprivscmd)FLINKHOMEbintaskmanager。shstartforeground{args〔〕}fiargs({args〔〕})Runningcommandinpassthroughmodeexec(dropprivscmd){args〔〕} 编排DockerfileFROMmyharbor。combigdatacentos:7。9。2009USERroot安装常用工具RUNyuminstallyvimtarwgetcurlrsyncbzip2iptablestcpdumplesstelnetnettoolslsof设置时区,默认是UTC时区RUNrmfetclocaltimelnsvusrsharezoneinfoAsiaShanghaietclocaltimeechoAsiaShanghaietctimezoneRUNmkdirpoptapacheADDjdk8u212linuxx64。tar。gzoptapacheADDflink1。14。6binscala2。12。tgzoptapacheENVFLINKHOMEoptapacheflink1。14。6ENVJAVAHOMEoptapachejdk1。8。0212ENVPATHJAVAHOMEbin:PATH创建用户应用jar目录RUNmkdirFLINKHOMEusrlibRUNmkdirhomeCOPYdockerentrypoint。shoptapacheRUNchmodxoptapachedockerentrypoint。shRUNgroupaddsystemgid9999adminuseraddsystemhomedirFLINKHOMEuid9999gidadminadminRUNchownRadmin:adminoptapache设置的工作目录WORKDIRFLINKHOME对外暴露端口EXPOSE61238081执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT〔optapachedockerentrypoint。sh〕CMD〔help〕 开始构建镜像dockerbuildtmyharbor。combigdataflinkcentosadmin:1。14。6scala2。12。nocache上传镜像dockerpushmyharbor。combigdataflinkcentosadmin:1。14。6scala2。12删除镜像dockerrmimyharbor。combigdataflinkcentosadmin:1。14。6scala2。12crictlrmimyharbor。combigdataflinkcentosadmin:1。14。6scala2。12【2】创建命名空间和serviceaccount创建namespacekubectlcreatensflink创建serviceaccountkubectlcreateserviceaccountflinkserviceaccountnflink用户授权kubectlcreateclusterrolebindingflinkrolebindingflinkclusterroleeditserviceaccountflink:flinkserviceaccount【3】编排yaml文件flinkconfigurationconfigmap。yamlapiVersion:v1kind:ConfigMapmetadata:name:flinkconfiglabels:app:flinkdata:flinkconf。yaml:jobmanager。rpc。address:flinkjobmanagertaskmanager。numberOfTaskSlots:2blob。server。port:6124jobmanager。rpc。port:6123taskmanager。rpc。port:6122queryablestate。proxy。ports:6125jobmanager。memory。process。size:3200mtaskmanager。memory。process。size:2728mtaskmanager。memory。flink。size:2280mparallelism。default:2log4jconsole。properties:ThisaffectsloggingforbothusercodeandFlinkrootLogger。levelINFOrootLogger。appenderRef。console。refConsoleAppenderrootLogger。appenderRef。rolling。refRollingFileAppenderUncommentthisifyouwanttoonlychangeFlinkslogginglogger。flink。nameorg。apache。flinklogger。flink。levelINFOThefollowinglineskeeptheloglevelofcommonlibrariesconnectorsonloglevelINFO。Therootloggerdoesnotoverridethis。Youhavetomanuallychangetheloglevelshere。logger。akka。nameakkalogger。akka。levelINFOlogger。kafka。nameorg。apache。kafkalogger。kafka。levelINFOlogger。hadoop。nameorg。apache。hadooplogger。hadoop。levelINFOlogger。zookeeper。nameorg。apache。zookeeperlogger。zookeeper。levelINFOLogallinfostotheconsoleappender。console。nameConsoleAppenderappender。console。typeCONSOLEappender。console。layout。typePatternLayoutappender。console。layout。patternd{yyyyMMddHH:mm:ss,SSS}5p60cxmnLogallinfosinthegivenrollingfileappender。rolling。nameRollingFileAppenderappender。rolling。typeRollingFileappender。rolling。appendfalseappender。rolling。fileName{sys:log。file}appender。rolling。filePattern{sys:log。file}。iappender。rolling。layout。typePatternLayoutappender。rolling。layout。patternd{yyyyMMddHH:mm:ss,SSS}5p60cxmnappender。rolling。policies。typePoliciesappender。rolling。policies。size。typeSizeBasedTriggeringPolicyappender。rolling。policies。size。size100MBappender。rolling。strategy。typeDefaultRolloverStrategyappender。rolling。strategy。max10Suppresstheirrelevant(wrong)warningsfromtheNettychannelhandlerlogger。netty。nameorg。jboss。netty。channel。DefaultChannelPipelinelogger。netty。levelOFFjobmanagerservice。yaml可选服务,仅非HA模式需要。apiVersion:v1kind:Servicemetadata:name:flinkjobmanagerspec:type:ClusterIPports:name:rpcport:6123name:blobserverport:6124name:webuiport:8081selector:app:flinkcomponent:jobmanagerjobmanagerrestservice。yaml可选服务,将jobmanagerrest端口公开为公共Kubernetes节点的端口。apiVersion:v1kind:Servicemetadata:name:flinkjobmanagerrestspec:type:NodePortports:name:restport:8081targetPort:8081nodePort:30081selector:app:flinkcomponent:jobmanagertaskmanagerquerystateservice。yaml可选服务,公开TaskManager端口以访问可查询状态作为公共Kubernetes节点的端口。apiVersion:v1kind:Servicemetadata:name:flinktaskmanagerquerystatespec:type:NodePortports:name:querystateport:6125targetPort:6125nodePort:30025selector:app:flinkcomponent:taskmanager 以上几个配置文件是公共的jobmanagersessiondeploymentnonha。yamlapiVersion:appsv1kind:Deploymentmetadata:name:flinkjobmanagerspec:replicas:1selector:matchLabels:app:flinkcomponent:jobmanagertemplate:metadata:labels:app:flinkcomponent:jobmanagerspec:containers:name:jobmanagerimage:myharbor。combigdataflinkcentosadmin:1。14。6scala2。12args:〔jobmanager〕ports:containerPort:6123name:rpccontainerPort:6124name:blobservercontainerPort:8081name:webuilivenessProbe:tcpSocket:port:6123initialDelaySeconds:30periodSeconds:60volumeMounts:name:flinkconfigvolumemountPath:optapacheflink1。14。6confsecurityContext:runAsUser:9999referstouserflinkfromofficialflinkimage,changeifnecessaryvolumes:name:flinkconfigvolumeconfigMap:name:flinkconfigitems:key:flinkconf。yamlpath:flinkconf。yamlkey:log4jconsole。propertiespath:log4jconsole。propertiestaskmanagersessiondeployment。yamlapiVersion:appsv1kind:Deploymentmetadata:name:flinktaskmanagerspec:replicas:2selector:matchLabels:app:flinkcomponent:taskmanagertemplate:metadata:labels:app:flinkcomponent:taskmanagerspec:containers:name:taskmanagerimage:myharbor。combigdataflinkcentosadmin:1。14。6scala2。12args:〔taskmanager〕ports:containerPort:6122name:rpccontainerPort:6125name:querystatelivenessProbe:tcpSocket:port:6122initialDelaySeconds:30periodSeconds:60volumeMounts:name:flinkconfigvolumemountPath:optapacheflink1。14。6confsecurityContext:runAsUser:9999referstouserflinkfromofficialflinkimage,changeifnecessaryvolumes:name:flinkconfigvolumeconfigMap:name:flinkconfigitems:key:flinkconf。yamlpath:flinkconf。yamlkey:log4jconsole。propertiespath:log4jconsole。properties【4】创建flink集群kubectlcreatensflinkConfigurationandservicedefinitionkubectlcreatefflinkconfigurationconfigmap。yamlnflinkservicekubectlcreatefjobmanagerservice。yamlnflinkkubectlcreatefjobmanagerrestservice。yamlnflinkkubectlcreateftaskmanagerquerystateservice。yamlnflinkCreatethedeploymentsfortheclusterkubectlcreatefjobmanagersessiondeploymentnonha。yamlnflinkkubectlcreateftaskmanagersessiondeployment。yamlnflink 镜像逆向解析dockerfilealiaswhalerdockerruntrmvvarrundocker。sock:varrundocker。sock:ropeglegwhalerwhalerflink:1。14。6scala2。12 查看kubectlgetpods,svcnflinkowide web:http:192。168。182。110:30081overview 【5】提交任务。binflinkrunmlocal168182110:30081。examplesstreamingWordCount。jar kubectllogsflinktaskmanager54649bf96czjtkhnflink 【6】删除flink集群kubectldeletefjobmanagerservice。yamlnflinkkubectldeletefflinkconfigurationconfigmap。yamlnflinkkubectldeleteftaskmanagersessiondeployment。yamlnflinkkubectldeletefjobmanagersessiondeployment。yamlnflinkkubectldeletensflinkforce【7】访问flinkweb 端口就是jobmanagerrestservice。yaml文件中的NodePort http:192。168。182。110:30081overview 4)application模式(推荐) Kubernetes中一个基本的FlinkApplication集群部署包含三个组件:运行JobManager的应用程序TaskManagers池的部署暴露JobManager的REST和UI端口的服务1、NativeKubernetes模式(常用)【1】构建镜像DockerfileFROMmyharbor。combigdataflink:1。14。6scala2。12RUNrmfetclocaltimelnsvusrsharezoneinfoAsiaShanghaietclocaltimeechoAsiaShanghaietctimezoneRUNexportLANGzhCN。UTF8RUNmkdirpFLINKHOMEusrlibCOPYTopSpeedWindowing。jarFLINKHOMEusrlib 开始构建镜像dockerbuildtmyharbor。combigdataflinkapplication:1。14。6scala2。12。nocache上传镜像dockerpushmyharbor。combigdataflinkapplication:1。14。6scala2。12删除镜像dockerrmimyharbor。combigdataflinkapplication:1。14。6scala2。12crictlrmimyharbor。combigdataflinkapplication:1。14。6scala2。12【2】创建命名空间和serviceacount创建namespacekubectlcreatensflink创建serviceaccountkubectlcreateserviceaccountflinkserviceaccountnflink用户授权kubectlcreateclusterrolebindingflinkrolebindingflinkclusterroleeditserviceaccountflink:flinkserviceaccount【3】创建flink集群并提交任务。binflinkrunapplicationtargetkubernetesapplicationDkubernetes。clusteridmyfirstapplicationclusterDkubernetes。container。imagemyharbor。combigdataflinkapplication:1。14。6scala2。12Dkubernetes。jobmanager。replicas1Dkubernetes。namespaceflinkDkubernetes。jobmanager。serviceaccountflinkserviceaccountDexternalresource。limits。kubernetes。cpu2000mDexternalresource。limits。kubernetes。memory2GiDexternalresource。requests。kubernetes。cpu1000mDexternalresource。requests。kubernetes。memory1GiDkubernetes。restservice。exposed。typeNodePortlocal:optflinkusrlibTopSpeedWindowing。jar 【注意】local是应用模式中唯一支持的方案。local代表本地环境,这里即pod或者容器环境,并非宿主机。 查看kubectlgetpodspods,svcnflink kubectllogsfmyfirstapplicationclustertaskmanager11nflink 【4】删除flink集群kubectldeletedeploymentmyfirstapplicationclusternflinkkubectldeletensflinkforce2、Standalone模式【1】构建镜像Dockerfile 启动脚本dockerentrypoint。sh!usrbinenvbashLicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements。SeetheNOTICEfiledistributedwiththisworkforadditionalinformationregardingcopyrightownership。TheASFlicensesthisfiletoyouundertheApacheLicense,Version2。0(theLicense);youmaynotusethisfileexceptincompliancewiththeLicense。YoumayobtainacopyoftheLicenseathttp:www。apache。orglicensesLICENSE2。0Unlessrequiredbyapplicablelaworagreedtoinwriting,softwaredistributedundertheLicenseisdistributedonanASISBASIS,WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied。SeetheLicenseforthespecificlanguagegoverningpermissionsandlimitationsundertheLicense。COMMANDSTANDALONEstandalonejobCOMMANDHISTORYSERVERhistoryserverIfunspecified,thehostnameofthecontaineristakenastheJobManageraddressJOBMANAGERRPCADDRESS{JOBMANAGERRPCADDRESS:(hostnamef)}CONFFILE{FLINKHOME}confflinkconf。yamldropprivscmd(){if〔(idu)!0〕;thenDontneedtodropprivsifEUID!0returnelif〔xsbinsuexec〕;thenAlpineechosuexecadminelseOthersechogosuadminfi}copypluginsifrequired(){if〔zENABLEBUILTINPLUGINS〕;thenreturn0fiechoEnablingrequiredbuiltinpluginsfortargetpluginin(echoENABLEBUILTINPLUGINS);doechoLinking{targetplugin}toplugindirectorypluginname{targetplugin。jar}mkdirp{FLINKHOME}plugins{pluginname}if〔!e{FLINKHOME}opt{targetplugin}〕;thenechoPlugin{targetplugin}doesnotexist。Exiting。exit1elselnfs{FLINKHOME}opt{targetplugin}{FLINKHOME}plugins{pluginname}echoSuccessfullyenabled{targetplugin}fidone}setconfigoption(){localoption1localvalue2escapeperiodsforusageinregularexpressionslocalescapedoption(echo{option}sedes。。g)eitheroverrideanexistingentry,orappendanewoneifgrepE{escapedoption}:。{CONFFILE}thensedies{escapedoption}:。option:valueg{CONFFILE}elseecho{option}:{value}{CONFFILE}fi}prepareconfiguration(){setconfigoptionjobmanager。rpc。address{JOBMANAGERRPCADDRESS}setconfigoptionblob。server。port6124setconfigoptionquery。server。port6125if〔n{TASKMANAGERNUMBEROFTASKSLOTS}〕;thensetconfigoptiontaskmanager。numberOfTaskSlots{TASKMANAGERNUMBEROFTASKSLOTS}fiif〔n{FLINKPROPERTIES}〕;thenecho{FLINKPROPERTIES}{CONFFILE}fienvsubst{CONFFILE}{CONFFILE}。tmpmv{CONFFILE}。tmp{CONFFILE}}maybeenablejemalloc(){if〔{DISABLEJEMALLOC:false}false〕;thenJEMALLOCPATHusrlib(unamem)linuxgnulibjemalloc。soJEMALLOCFALLBACKusrlibx8664linuxgnulibjemalloc。soif〔fJEMALLOCPATH〕;thenexportLDPRELOADLDPRELOAD:JEMALLOCPATHelif〔fJEMALLOCFALLBACK〕;thenexportLDPRELOADLDPRELOAD:JEMALLOCFALLBACKelseif〔JEMALLOCPATHJEMALLOCFALLBACK〕;thenMSGPATHJEMALLOCPATHelseMSGPATHJEMALLOCPATHandJEMALLOCFALLBACKfiechoWARNING:attemptedtoloadjemallocfromMSGPATHbutthelibrarycouldntbefound。glibcwillbeusedinstead。fifi}maybeenablejemalloccopypluginsifrequiredprepareconfigurationargs()if〔1help〕;thenprintfUsage:(basename0)(jobmanager{COMMANDSTANDALONE}taskmanager{COMMANDHISTORYSERVER})printfOr(basename0)helpprintfBydefault,Flinkimageadoptsjemallocasdefaultmemoryallocator。ThisbehaviorcanbedisabledbysettingtheDISABLEJEMALLOCenvironmentvariabletotrue。exit0elif〔1jobmanager〕;thenargs({args〔〕:1})echoStartingJobManagerexec(dropprivscmd)FLINKHOMEbinjobmanager。shstartforeground{args〔〕}elif〔1{COMMANDSTANDALONE}〕;thenargs({args〔〕:1})echoStartingJobManagerexec(dropprivscmd)FLINKHOMEbinstandalonejob。shstartforeground{args〔〕}elif〔1{COMMANDHISTORYSERVER}〕;thenargs({args〔〕:1})echoStartingHistoryServerexec(dropprivscmd)FLINKHOMEbinhistoryserver。shstartforeground{args〔〕}elif〔1taskmanager〕;thenargs({args〔〕:1})echoStartingTaskManagerexec(dropprivscmd)FLINKHOMEbintaskmanager。shstartforeground{args〔〕}fiargs({args〔〕})Runningcommandinpassthroughmodeexec(dropprivscmd){args〔〕} 编排DockerfileFROMmyharbor。combigdatacentos:7。9。2009USERroot安装常用工具RUNyuminstallyvimtarwgetcurlrsyncbzip2iptablestcpdumplesstelnetnettoolslsof设置时区,默认是UTC时区RUNrmfetclocaltimelnsvusrsharezoneinfoAsiaShanghaietclocaltimeechoAsiaShanghaietctimezoneRUNmkdirpoptapacheADDjdk8u212linuxx64。tar。gzoptapacheADDflink1。14。6binscala2。12。tgzoptapacheENVFLINKHOMEoptapacheflink1。14。6ENVJAVAHOMEoptapachejdk1。8。0212ENVPATHJAVAHOMEbin:PATH创建用户应用jar目录RUNmkdirFLINKHOMEusrlibRUNmkdirhomeCOPYdockerentrypoint。shoptapacheRUNgroupaddsystemgid9999adminuseraddsystemhomedirFLINKHOMEuid9999gidadminadminRUNchownRadmin:adminoptapacheRUNchmodx{FLINKHOME}dockerentrypoint。sh设置的工作目录WORKDIRFLINKHOME对外暴露端口EXPOSE61238081执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT〔optapachedockerentrypoint。sh〕CMD〔help〕dockerbuildtmyharbor。combigdataflinkcentosadmin:1。14。6scala2。12。nocache上传镜像dockerpushmyharbor。combigdataflinkcentosadmin:1。14。6scala2。12删除镜像dockerrmimyharbor。combigdataflinkcentosadmin:1。14。6scala2。12【2】创建命名空间和serviceacount创建namespacekubectlcreatensflink创建serviceaccountkubectlcreateserviceaccountflinkserviceaccountnflink用户授权kubectlcreateclusterrolebindingflinkrolebindingflinkclusterroleeditserviceaccountflink:flinkserviceaccount【3】编排yaml文件 flinkconfigurationconfigmap。yamlapiVersion:v1kind:ConfigMapmetadata:name:flinkconfiglabels:app:flinkdata:flinkconf。yaml:jobmanager。rpc。address:flinkjobmanagertaskmanager。numberOfTaskSlots:2blob。server。port:6124jobmanager。rpc。port:6123taskmanager。rpc。port:6122queryablestate。proxy。ports:6125jobmanager。memory。process。size:3200mtaskmanager。memory。process。size:2728mtaskmanager。memory。flink。size:2280mparallelism。default:2log4jconsole。properties:ThisaffectsloggingforbothusercodeandFlinkrootLogger。levelINFOrootLogger。appenderRef。console。refConsoleAppenderrootLogger。appenderRef。rolling。refRollingFileAppenderUncommentthisifyouwanttoonlychangeFlinkslogginglogger。flink。nameorg。apache。flinklogger。flink。levelINFOThefollowinglineskeeptheloglevelofcommonlibrariesconnectorsonloglevelINFO。Therootloggerdoesnotoverridethis。Youhavetomanuallychangetheloglevelshere。logger。akka。nameakkalogger。akka。levelINFOlogger。kafka。nameorg。apache。kafkalogger。kafka。levelINFOlogger。hadoop。nameorg。apache。hadooplogger。hadoop。levelINFOlogger。zookeeper。nameorg。apache。zookeeperlogger。zookeeper。levelINFOLogallinfostotheconsoleappender。console。nameConsoleAppenderappender。console。typeCONSOLEappender。console。layout。typePatternLayoutappender。console。layout。patternd{yyyyMMddHH:mm:ss,SSS}5p60cxmnLogallinfosinthegivenrollingfileappender。rolling。nameRollingFileAppenderappender。rolling。typeRollingFileappender。rolling。appendfalseappender。rolling。fileName{sys:log。file}appender。rolling。filePattern{sys:log。file}。iappender。rolling。layout。typePatternLayoutappender。rolling。layout。patternd{yyyyMMddHH:mm:ss,SSS}5p60cxmnappender。rolling。policies。typePoliciesappender。rolling。policies。size。typeSizeBasedTriggeringPolicyappender。rolling。policies。size。size100MBappender。rolling。strategy。typeDefaultRolloverStrategyappender。rolling。strategy。max10Suppresstheirrelevant(wrong)warningsfromtheNettychannelhandlerlogger。netty。nameorg。jboss。netty。channel。DefaultChannelPipelinelogger。netty。levelOFF jobmanagerservice。yaml可选服务,仅非HA模式需要。apiVersion:v1kind:Servicemetadata:name:flinkjobmanagerspec:type:ClusterIPports:name:rpcport:6123name:blobserverport:6124name:webuiport:8081selector:app:flinkcomponent:jobmanager jobmanagerrestservice。yaml可选服务,将jobmanagerrest端口公开为公共Kubernetes节点的端口。apiVersion:v1kind:Servicemetadata:name:flinkjobmanagerrestspec:type:NodePortports:name:restport:8081targetPort:8081nodePort:30081selector:app:flinkcomponent:jobmanager taskmanagerquerystateservice。yaml可选服务,公开TaskManager端口以访问可查询状态作为公共Kubernetes节点的端口。apiVersion:v1kind:Servicemetadata:name:flinktaskmanagerquerystatespec:type:NodePortports:name:querystateport:6125targetPort:6125nodePort:30025selector:app:flinkcomponent:taskmanager jobmanagerapplicationnonha。yaml,非高可用apiVersion:batchv1kind:Jobmetadata:name:flinkjobmanagerspec:template:metadata:labels:app:flinkcomponent:jobmanagerspec:restartPolicy:OnFailurecontainers:name:jobmanagerimage:myharbor。combigdataflinkcentosadmin:1。14。6scala2。12env:args:〔standalonejob,jobclassname,org。apache。flink。examples。java。wordcount。WordCount,output,tmpresult〕ports:containerPort:6123name:rpccontainerPort:6124name:blobservercontainerPort:8081name:webuilivenessProbe:tcpSocket:port:6123initialDelaySeconds:30periodSeconds:60volumeMounts:name:flinkconfigvolumemountPath:optapacheflink1。14。6confname:jobartifactsvolumemountPath:optapacheflink1。14。6usrlibsecurityContext:runAsUser:9999referstouserflinkfromofficialflinkimage,changeifnecessaryvolumes:name:flinkconfigvolumeconfigMap:name:flinkconfigitems:key:flinkconf。yamlpath:flinkconf。yamlkey:log4jconsole。propertiespath:log4jconsole。propertiesname:jobartifactsvolumehostPath:path:mntnfsdataflinkapplicationjobartifacts 【温馨提示】注意这里的挂载mntbigdataflinkusrlib,最好这里使用共享目录。 taskmanagerjobdeployment。yamlapiVersion:appsv1kind:Deploymentmetadata:name:flinktaskmanagerspec:replicas:2selector:matchLabels:app:flinkcomponent:taskmanagertemplate:metadata:labels:app:flinkcomponent:taskmanagerspec:containers:name:taskmanagerimage:myharbor。combigdataflinkcentosadmin:1。14。6scala2。12env:args:〔taskmanager〕ports:containerPort:6122name:rpccontainerPort:6125name:querystatelivenessProbe:tcpSocket:port:6122initialDelaySeconds:30periodSeconds:60volumeMounts:name:flinkconfigvolumemountPath:optapacheflink1。14。6confname:jobartifactsvolumemountPath:optapacheflink1。14。6usrlibsecurityContext:runAsUser:9999referstouserflinkfromofficialflinkimage,changeifnecessaryvolumes:name:flinkconfigvolumeconfigMap:name:flinkconfigitems:key:flinkconf。yamlpath:flinkconf。yamlkey:log4jconsole。propertiespath:log4jconsole。propertiesname:jobartifactsvolumehostPath:path:mntnfsdataflinkapplicationjobartifacts【4】创建flink集群并提交任务kubectlcreatensflinkConfigurationandservicedefinitionkubectlcreatefflinkconfigurationconfigmap。yamlnflinkservicekubectlcreatefjobmanagerservice。yamlnflinkkubectlcreatefjobmanagerrestservice。yamlnflinkkubectlcreateftaskmanagerquerystateservice。yamlnflinkCreatethedeploymentsfortheclusterkubectlcreatefjobmanagerapplicationnonha。yamlnflinkkubectlcreateftaskmanagerjobdeployment。yamlnflink 查看kubectlgetpods,svcnflink 【5】删除flink集群kubectldeletefflinkconfigurationconfigmap。yamlnflinkkubectldeletefjobmanagerservice。yamlnflinkkubectldeletefjobmanagerrestservice。yamlnflinkkubectldeleteftaskmanagerquerystateservice。yamlnflinkkubectldeletefjobmanagerapplicationnonha。yamlnflinkkubectldeleteftaskmanagerjobdeployment。yamlnflinkkubectldeletensflinkforce【6】查看kubectlgetpods,svcnflinkkubectlexecitflinktaskmanager54cb7fc57cg484qnflinkbash Flinkonk8s讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享【云原生大数据】相关的教程,请小伙伴耐心等待