1部署方式1。1独立集群 独立集群包含至少一个master进程,以及至少一个TaskManager进程,TaskManager进程运行在一台或者多台机器上。所有的进程都是JVM进程。下图展示了独立集群的部署。 master进程在不同的线程中运行了一个Dispatcher和一个ResourceManager。一旦它们开始运行,所有TaskManager都将在Resourcemanager中进行注册。下图展示了一个任务如何提交到一个独立集群中去。 客户端向Dispatcher提交了一个任务,Dispatcher将会启动一个作业管理器线程,并提供执行所需的JobGraph。作业管理器向ResourceManager请求必要的taskslots。一旦请求的slots分配好,作业管理器就会部署job。 在standalone这种部署方式中,master和worker进程在失败以后,并不会自动重启。如果有足够的slots可供使用,job是可以从一次worker失败中恢复的。只要我们运行多个worker就好了。但如果job想从master失败中恢复的话,则需要进行高可用(HA)的配置了。 部署步骤 下载压缩包 链接:http:mirror。bit。edu。cnapacheflinkflink1。11。0flink1。11。0binscala2。11。tgz 解压缩tarxvfzflink1。11。0binscala2。11。tgz 启动集群cdflink1。11。0。binstartcluster。sh 检查集群状态可以访问:http:localhost:8081 部署分布式集群所有运行TaskManager的机器的主机名(或者IP地址)都需要写入。confslaves文件中。startcluster。sh脚本需要所有机器的无密码的SSH登录配置,方便启动TaskManager进程。Flink的文件夹在所有的机器上都需要有相同的绝对路径。运行master进程的机器的主机名或者IP地址需要写在。confflinkconf。yaml文件的jobmanager。rpc。address配置项。 一旦部署好,我们就可以调用。binstartcluster。sh命令启动集群了,脚本会在本地机器启动一个作业管理器,然后在每个slave机器上启动一个TaskManager。停止运行,请使用。binstopcluster。sh。1。2ApacheHadoopYarn YARN是ApacheHadoop的资源管理组件。用来计算集群环境所需要的CPU和内存资源,然后提供给应用程序请求的资源。 Flink在YARN上运行,有两种模式:job模式和session模式。在job模式中,Flink集群用来运行一个单独的job。一旦job结束,Flink集群停止,并释放所有资源。下图展示了Flink的job如何提交到YARN集群。 当客户端提交任务时,客户端将建立和YARNResourceManager的连接,然后启动一个新的YARN应用的master进程,进程中包含一个作业管理器线程和一个ResourceManager。作业管理器向ResourceManager请求所需要的slots,用来运行Flink的job。接下来,Flink的ResourceManager将向Yarn的ResourceManager请求容器,然后启动TaskManager进程。一旦启动,TaskManager会将slots注册在Flink的ResourceManager中,Flink的ResourceManager将把slots提供给作业管理器。最终,作业管理器把job的任务提交给TaskManager执行。 sesison模式将启动一个长期运行的Flink集群,这个集群可以运行多个job,需要手动停止集群。如果以session模式启动,Flink将会连接到YARN的ResourceManager,然后启动一个master进程,包括一个Dispatcher线程和一个Flink的ResourceManager的线程。下图展示了一个FlinkYARNsession的启动。 当一个作业被提交运行,分发器将启动一个作业管理器线程,这个线程将向Flink的资源管理器请求所需要的slots。如果没有足够的slots,Flink的资源管理器将向YARN的资源管理器请求额外的容器,来启动TaskManager进程,并在Flink的资源管理器中注册。一旦所需slots可用,Flink的资源管理器将把slots分配给作业管理器,然后开始执行job。下图展示了job如何在session模式下执行。 无论是作业模式还是会话模式,Flink的ResourceManager都会自动对故障的TaskManager进行重启。你可以通过。confflinkconf。yaml配置文件来控制Flink在YARN上的故障恢复行为。例如,可以配置有多少容器发生故障后终止应用。 无论使用job模式还是sesison模式,都需要能够访问Hadoop。 job模式可以用以下命令来提交任务:。binflinkrunmyarncluster。pathtojob。jar 参数m用来定义提交作业的目标主机。如果加上关键字yarncluster,客户端会将作业提交到由Hadoop配置所指定的YARN集群上。Flink的CLI客户端还支持很多参数,例如用于控制TaskManager容器内存大小的参数等。有关它们的详细信息,请参阅文档。Flink集群的WebUI由YARN集群某个节点上的主进程负责提供。你可以通过YARN的WebUI对其进行访问,具体链接位置在TrackingURL:ApplicationMaster下的ApplicationOverview页面上。 session模式则是。binyarnsession。sh启动一个yarn会话。binflinkrun。pathtojob。jar向会话提交作业 Flink的WebUI链接可以从YARNWebUI的ApplicationOverview页面上找到。2高可用配置 Flink的高可用配置需要ApacheZooKeeper组件,以及一个分布式文件系统,例如HDFS等等。作业管理器将会把相关信息都存储在文件系统中,并将指向文件系统中相关信息的指针保存在ZooKeeper中。一旦失败,一个新的作业管理器将从ZooKeeper中指向相关信息的指针所指向的文件系统中读取元数据,并恢复运行。 配置文件编写highavailability。zookeeper。quorum:address1:2181〔,。。。〕,addressX:2181highavailability。storageDir:hdfs:flinkrecoveryhighavailability。zookeeper。path。root:flink2。1独立集群高可用配置 需要在配置文件中加一行集群标识符信息,因为可能多个集群共用一个zookeeper服务。highavailability。clusterid:cluster12。2yarn集群高可用配置 首先在yarn集群的配置文件yarnsite。xml中加入以下代码propertynameyarn。resourcemanager。am。maxattemptsnamevalue4valuedescriptionThemaximumnumberofapplicationmasterexecutionattempts。Defaultvalueis2,i。e。,anapplicationisrestartedatmostonce。descriptionproperty 然后在。confflinkconf。yaml加上yarn。applicationattempts:43与Hadoop集成 推荐两种方法下载包含hadoop的Flink版本。使用我们之前下载的Flink,然后配置Hadoop的环境变量。exportHADOOPCLASSPATH{hadoopclasspath} 我们还需要提供Hadoop配置文件的路径。只需设置名为HADOOPCONFDIR的环境变量就可以了。这样Flink就能够连上YARN的ResourceManager和HDFS了。4保存点操作。binflinksavepointjobId〔savepointPath〕 例如。binflinksavepointbc0b2ad61ecd4a615d92ce25390f61adhdfs:xxx:50070savepointsTriggeringsavepointforjobbc0b2ad61ecd4a615d92ce25390f61ad。Waitingforresponse。。。Savepointcompleted。Path:hdfs:xxx:50070savepointssavepointbc0b2a63cf5d5ccef8Youcanresumeyourprogramfromthissavepointwiththeruncommand。 删除保存点文件。binflinksavepointdsavepointPath 例子。binflinksavepointdhdfs:xxx:50070savepointssavepointbc0b2a63cf5d5ccef8Disposingsavepointhdfs:xxx:50070savepointssavepointbc0b2a63cf5d5ccef8。Waitingforresponse。。。Savepointhdfs:xxx:50070savepointssavepointbc0b2a63cf5d5ccef8disposed。5取消一个应用。binflinkcanceljobId 取消的同时做保存点操作。binflinkcancels〔savepointPath〕jobId 例如。binflinkcancelshdfs:xxx:50070savepointsd5fdaff43022954f5f02fcd8f25ef855Cancellingjobbc0b2ad61ecd4a615d92ce25390f61adwithsavepointtohdfs:xxx:50070savepoints。Cancelledjobbc0b2ad61ecd4a615d92ce25390f61ad。Savepointstoredinhdfs:xxx:50070savepointssavepointbc0b2ad08de07fbb10。6从保存点启动应用程序。binflinkrunssavepointPath〔options〕jobJar〔arguments〕7扩容,改变并行度操作。binflinkmodifyjobIdpnewParallelism 例子。binflinkmodifybc0b2ad61ecd4a615d92ce25390f61adp16Modifyjobbc0b2ad61ecd4a615d92ce25390f61ad。Rescaledjobbc0b2ad61ecd4a615d92ce25390f61ad。Itsnewparallelismis16。