总体介绍 众所周知,Flink在提交和运行Flink作业时,需要配置Flink资源信息,包括TaskManager的数量,每个TaskManager的CPU数、内存大小以及Slot数量。TaskManager的数量,每个TaskManager的CPU数、内存大小都比较容易理解,主要是配置启动的计算进程数以及每个进程绑定的物理资源大小。 那么Slot是什么?为什么需要在Flink作业启动时配置? 一言以蔽之,Slot是Flink集群管理资源的最小单位,也是Flink作业申请和释放资源的单位。本文主要分析Flink基于Slot的资源管理、作业资源申请以及释放流程。 阅读提示:字节跳动内部目前主要使用Flink1。11版本,所以本文分析的FlinkSlot资源管理实现部分内容将基于此版本展开。资源流程分析 Flink作业在运行过程中,整个Flink集群其实分为四个角色节点,分别为Dispatcher、JobMaster、ResourceManager以及TaskManager,其中Dispatcher、JobMaster以及ResourceManager在同一个进程内启动和执行。Dispatcher接收各类查询请求,例如作业的各类Metrics等;JobMaster是作业的AM,管理作业的执行状态;ResourceManager管理Flink集群的资源和资源分配;TaskManager管理Flink计算任务的执行。 Flink作业被提交到资源管理器(YarnK8s)后,资源管理器根据作业所需的资源配置(多少个TaskManager,每个TaskManager分配多少CPU内存)为作业分配资源,并启动对应数量的TaskManager进程。 TaskManager进程启动后,向ResourceManager节点注册信息,其中最关键的信息就是Slot。 TaskManager根据配置的每个TaskManager的Slot数,向ResourceManager汇报Slot,而在ResourceManager节点内维护和管理所有的Slot列表。我们可以简单地将Slot理解为资源槽,这个资源槽会在TaskManager上跟作业具体的计算任务关联。 Slot是一个逻辑概念,它不会跟具体的CPU核数绑定,例如一个TaskManager绑定了N个CPU和配置了M个Slot,N和M之间没有任何关联。 但是Slot跟内存资源相关,我们知道TaskManager启动时会指定进程的总内存大小,这块的内存会被分为堆内内存、堆外内存,其中堆外内存又被分为ManagedMemory和DirectMemory,对具体内存划分有兴趣的小伙伴可以通过Flink内存模型详细了解。 这里我们要说的是ManagedMemory,这部分内存不会预先分配,但是会按照Slot划分大小。简单来讲,就是将TaskManager中ManagedMemory总大小除以Slot的数量,就是每个Slot可以使用的ManagedMemory大小。 TaskManager的每个Slot关联多个计算任务,每个计算任务由独立的Java线程执行,所以多个计算线程会跟一个Slot关联,也就是多个计算线程会共享一个ManagedMemory内存。 Slot申请流程 上文提到,TaskManager根据配置的Slot数量,会向ResourceManager汇报它上面的Slot数据。ResourceManager节点在内部维护TaskManager列表,每个TaskManager分别有哪些Slot以及目前空闲的Slot集合。 Flink集群中的每个Flink作业会有一个JobMaster节点,JobMaster节点将Flink作业解析成物理执行计划,向ResourceManager申请Slot资源,同时管理作业中每个计算任务的执行状态。当一个作业提交到Flink集群后,Slot资源总体申请流程如下图所示。 TaskManager向ResourceManager汇报Slot资源数据;JobMaster向ResourceManager发起Slot资源申请;ResourceManager根据FreeSlots集合为Slot申请分配Slot,然后向TaskManager发起资源确认;TaskManager在进程内将指定Slot标记为指定作业使用,然后向JobMaster汇报Slot资源分配信息;JobMaster接收到Slot资源分配信息后,将Slot资源跟作业中的计算任务关联;JobMaster将计算任务描述信息发送到Slot所属的TaskManager,部署计算任务。 ResourceManager ResourceManager在SlotPoolManager中管理和维护TaskManager以及对应的Slot集合,每个Slot有三种状态:FREE、PENDING和ALLOCATED。 ResourceManager处理Slot申请是一个异步过程,ResourceManager接收到Slot申请后会先将请求放入到Pending列表中,然后给这些请求分配Slot,最后向TaskManager发送请求确认资源申请,完成确认后会更新分配的Slot状态。主要流程和操作如下图所示。 JobMaster JobMaster申请资源也是一个异步申请加回调确认的过程,主要通过SlotPool管理和实现资源申请。 SlotPool中管理4类Slot相关数据结构,分别为waitingForResourceManager列表结构、pendingRequests列表结构、AvailableSlots结构和AllocatedSlots结构。 1、waitingForResourceManager数据 Flink作业的JobMaster根据每个计算任务,生成一个Slot申请请求,并放入到一个waitingForResourceManager的请求列表内。这里需要注意的是会有Slot共享的问题,如果多个计算任务共享同一个Slot,那么这些计算任务只会生成一个Slot申请请求。 每个Slot请求会生成唯一的AllocationID,该ID会由ResourceManager发送给TaskManager,并最终返回给JobMaster。 当JobMaster跟ResourceManager建立连接时,从waitingForResourceManager中遍历获取每个Slot请求,然后逐个向ResourceManager发送Slot申请,同时将每个SlotRequest放入到PendingRequest列表中。 2、PendingRequests数据 在上文我们提到,JobMaster向ResourceManager发送申请Slot请求后,由于ResourceManager的异步申请机制,ResourceManager并不会直接返回申请到的Slot数据,所以JobMaster会将Slot请求放入到PendingRequest等待回调。 当JobMaster接收到TaskManager的offerSlots请求获取到申请的Slot信息时,才真正完成Slot的申请,在offerSlots请求中包含分配给该作业的Slot列表。JobMaster会遍历每个offerSlot,执行每个Slot的分配操作,具体可以分为以下几个步骤:根据上文中每个请求的AllocationID从pendingRequests中移除指定的R通过异步回调,将Slot分配给指定的计算任务(多个计算任务共享,则会分配多个计算任务);在AllocatedSlots数据结构中增加分配的Slot信息。 3、AllocatedSlots和AvailableSlots数据 AllocatedSlots存放已经被分配给计算任务的Slot信息列表,AvailableSlots存放还未被分配给计算任务的Slot信息列表。 由于存在超时重新申请等异常情况,例如JobMaster申请Slot超时重新发起申请请求,所以存在TaskManager向JobMaster返回的offerSlots根据AllocationID在pendingRequests中找不到对应请求,或者在LazyFromSource过程中上游计算任务执行完成需要释放Slot等情况,所以会将这些未被分配的Slot放入到AvailableSlots中。 当作业需要新的Slot分配给指定的计算任务时,会优先从AvailableSlots中查找Slot资源,只有未找到才会向ResourceManager发起请求。 在AvailableSlots中的每个Slot会带有一个时间戳,后台线程会定时检查AvailableSlots中的每个Slot,如果时间戳和当前时间超过一定阈值,该Slot会被主动释放掉,避免资源泄漏。 JobMaster中Slot资源申请操作流程如下图所示。 上述流程主要基于1。11版本分析Slot资源申请流程,在最新发布的1。14版本中Flink实现了Declarative资源申请,总体流程也是走JMRMTM,但是具体实现有比较大的简化。 TaskManager TaskManager中有两个数据结构跟作业申请资源相关:TaskSlotTable和JobTable。TaskSlotTable管理TaskManager中的Slot以及跟计算任务之间的关系,主要包含以下几类数据:TaskManager中的Slot数量;taskSlots,根据Slot索引号管理该Slot的状态(TaskSlot),TaskSlot里包含该Slot的计算任务列表等数据;allocatedSlots,根据AllocationID管理该Slot的状态(TaskSlot);taskSlotMappings,根据计算任务的ID(ExecutionAttemptID)管理计算任务和TaskSslotsPerJob,根据JobID管理属于该Job的AllocationID集合。 JobTable管理和JobMaster的连接信息,当TaskManager获取到指定作业的Slot申请时,根据JobMaster的地址跟JobMaster创建连接,向JobMaster注册,并将连接信息保存到JobTable中。 TaskManager在接收到ResourceManager发送过来的Slot申请后,会对Slot申请进行处理并更新TaskSlotTable,在这个过程中会将Slot申请加入到定时检查中,释放超时未分配成功的Slot资源。具体流程如下图所示。 这里比较重要的一个流程是接收到指定作业的Slot申请后,会跟作业创建连接,然后将TaskManager注册到JobMaster,JobMaster接收到注册信息后会跟TaskManager创建连接和心跳监控,TM和JM的心跳只监控连通性,相关流程如下图所示。 计算任务部署流程 JobMaster将Slot资源分配给计算任务后,生成计算任务的部署信息,部署信息里包含作业信息、计算任务信息、上下游Shuffle信息以及计算任务所部署的Slot索引号信息等,然后JobMaster将部署信息发送给指定的TaskManager。 TaskManager接收到计算任务部署信息后,对计算任务进行校验、部署和执行,这个过程涉及到TaskSlotTable以及JobTable操作,具体流程如下图所示。 TaskSlot有三个状态:ACTIVE:正在被指定的作业使用;ALLOCATED:创建时的初始状态,为某个作业创建,但是还没被使用;RELEASING:正在被释放中。 在TaskSlot创建时,会初始化一个MemoryManager,管理Slot中所有计算任务申请和释放ManagedMemory,共用TaskSlot的所有计算任务共享MemoryManager,TaskSlot管理了所有在上面运行的Task列表。 任务结束和Slot释放 TaskManager中的计算任务完成计算后,会释放该计算任务申请和使用的资源,涉及到Slot相关的主要以下几个操作:释放MemoryManager中指定计算任务申请的内存分片;从TaskSlotTable中移除管理的计算任务。 完成上述操作后,会向JobMaster发送计算任务更新。JobMaster收集到所有计算任务的更新消息后,完成作业执行并跟ResourceManager断开连接,然后遍历申请到的Slot并向指定的TaskManager发送资源释放请求。 TaskManager接受到Slot释放请求后,会从TaskSlotTable移除指定的Slot信息并向ResourceManager释放Slot信息,如果作业所有Slot都被删除,会关闭跟JobMaster的连接。TaskManager处理的总体流程如下图所示。 ResourceManager接收到指定Slot释放请求后,会从资源申请列表中查找与该Slot匹配的申请请求并处理,若申请列表没有请求则将Slot放入到空闲Slot列表中。资源管理优化 开源社区在1。11版本之后,对资源申请和释放流程具体的代码实现做过比较大的重构和优化。在资源管理和流程实现上,主要是支持细粒度资源管理和声明式资源申请。 细粒度资源管理 在上文我们提到,TaskManager会将ManagedMemory会按照里面的Slot进行等分,这会带来资源浪费。 作业的每个计算节点都可以设置不同的并发度,所以每个Slot内部执行的计算任务类型和数量有可能是不同的。这意味着每个Slot的计算任务所需的内存资源会存在比较大的不同,比如有些Slot有Join等计算任务,有些Slot没有。 原先按照Slot平均划分内存大小的方式会造成资源浪费,为了提升资源使用率,从Flink1。14版本开始支持细粒度资源申请。 JobMaster向ResourceManager申请Slot时,会向ResourceManager指定资源数量,包括CPU、内存等。ResourceManager根据当前维护的资源列表,为作业分配指定资源的Slot,同时向指定的TaskManager发送请求。TaskManager接收到带有资源信息的Slot申请后,会创建Slot,并向JobMaster确认资源申请。 所以从Slot总体申请流程上,新版本跟原先的处理是相同的,这块主要是支持在Slot申请时带上相应的资源信息,ResourceManager会根据管理的TaskManager剩余资源信息为计算任务分配TaskManager。 声明式资源申请 在上面的Slot申请流程中,一个Flink作业会申请若干个Slot资源,但是在申请过程中是按照单个Slot独立申请的。Flink作业,特别是流式作业,通常需要完成所有所需的Slot资源申请后,作业才能正常运行。 所以在Flink新版本中支持了声明式资源申请,JobMaster向ResourceManager申请资源时,会将所需的多个Slot打包成一个Batch,向ResourceManager发起资源申请。ResourceManager接收到作业的多个Slot申请后,会处理SlotManager中管理的资源,然后根据Slot逐个向指定的TaskManager发起资源请求。 每个作业所需的Slot数量,目前是在JobMaster资源申请时进行打包处理,后续可能会根据JobGraph执行计划中每个计算节点的并发度直接计算。总结 总体上来讲,Flink整个资源管理、申请和分配围绕Slot展开,同时每个TaskManager中的Slot数量决定了作业在该TaskManager中运行的并发计算任务数量。本篇文章主要介绍了Slot对资源分配、释放以及计算执行的影响,希望可以帮助大家更好地决策每个TaskManager中的Slot数量,对Flink作业进行调优。 目前,字节跳动流式计算团队同步支持的火山引擎流式计算Flink版正在公测中,支持云中立模式,支持公共云、混合云及多云部署,全面贴合企业上云策略,欢迎申请试用:流式计算Flink版火山引擎 相关资料:细粒度资源管理:https:cwiki。apache。orgconfluencedisplayFLINKFLIP563ADynamicSlotAllocation声明式资源申请:https:cwiki。apache。orgconfluencedisplayFLINKFLIP1383ADeclarativeResourcemanagement 欢迎关注公众号【字节跳动云原生计算】,获取更多技术干货和产品资讯!