Flink源码——Job 提交、部署流程源码分析之 Slot 管理(申请和释放)
2022-04-08 17:16:24 16 举报
Flink源码——Job 提交、部署流程源码分析之 Slot 管理(申请和释放)
作者其他创作
大纲/内容
* 注释: 申请 SingleSlot\t * 1、首先尝试从 JobMaster 的 SlotPool 中申请,看看是否能申请到,如果能,则进行分配即可\t * 2、如果不能,则尝试从 ResourceManager 去申请\t * 在一个 JobMaster 工作期间,它所申请到的所有的 Slot 都被管理在 SlotPool\t * JobMaster 委托 SlotPool 来进行 Slot 管理
计算这些 Slot 申请过程中对应 Slot 的 AllocationID
if(!pendingTaskManagerSlotOptional.isPresent())
分配资源以及部署Task
更改 Job 的状态
将 DefaultExecutionVertex 变成 ExecutionVertexDeploymentOption
构造一个 AllocationID
executionGraph.transitionToRunning();
* 流式作业默认调度方式: schedulingStrategy = EagerSchedulingStrategy\t\t * 1、EagerSchedulingStrategy(主要用于流式作业,所有顶点(ExecutionVertex)同时开始调度)\t\t * 2、LazyFromSourcesSchedulingStrategy(主要用于批作业,从 Source 开始开始调度,其他顶点延迟调度)调度
找到了符合条件的 slot, 进行分配
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
将 verticesToDeploy 变成 List 集合
if (this.equals(required)) {\t\t\treturn true;\t\t}
slot.isMatchingRequirementrequired代表亲请求
将 ExecutionVertex 封装成 ExecutionVertexSchedulingRequirements
for(ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
return startNewWorker(workerResourceSpec);
创建一个请求对象
Standalone模式
申请Slot待调度执行的 ExecutionVertex 集合
通过一个专业的 ExecutionSlotAllocator slot申请器 来申请 Slots
Standalone模式 并不会創建新的TaskExecutor
给每一个 ExecutionVertex 构建一个 Handler 用于部署: DeploymentHandle
既然申请成功了,则从 freeSlots 集合中删除该 申请到的 Slot
* 注释: 三个步骤: * 1、首先初始化一个容器,用来存储申请到的 SlotExecutionVertexAssignment * 2、遍历待申请slot的 ExecutionVertex 集合: executionVertexSchedulingRequirements, 依次执行 slot 申请 * 3、处理申请结果: 如果申请到,最终申请到的是: LogicalSlot * 在申请 slot 过程中的两种关于 slot 的抽象: * 1、LogicalSlot 逻辑slot * 2、PhysicalSlot 物理slot * 共享 slot 的概念 * 可能存在多个Task公用一个slot的情况: * 事实上,每个去申请slot的Task,都能申请到一个LogicalSlot * 但是有可能,多个申请 slot 的 Task 申请到的 LogicalSlot 属于同一个 PhysicalSlot
从 PendingTaskManagerSlot 中选择
启动所有的服务协调组件
根据cpuCores以及各Memory情况判断
跳转
准备将 Slot 申请请求发送给 ResourceManager\t * 1、如果这一次 slot 申请,最终申请到了 slot,则这个slot 会被分配一个 AllocationID\t * \t防止重复申请
Flink源码(1.11.x) Job 提交、部署流程源码分析之 Slot 管理(申请和释放)
* 注释: 如果尝试从可用申请不到 * 调用: requestNewAllocatedSlot() * 暂时没有可用的,如果允许排队的话,可以要求 SlotPool 向 RM 申请一个新的 slot * 如果你是第一次来申请 slot,那么你的 slotpool 一定是没有的 * 如果你去申请的时候,可以从 slotpool 中申请,那么就不走这儿
N
注册一个 JobManagerJobStatusListener
Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
return requestYarnContainer(workerResourceSpec);
代码流程见:Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行
如果还没有和 ResourceManager 建立连接, 则先把请求存储起来,待建立连接之后,再来处理这些请求
this::getNumberRegisteredSlotsOf
return executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
遍历每个 ExecutionVertexSchedulingRequirements, 为每个 ExecutionVertex 申请 Slot
slotManager.registerSlotRequest(slotRequest);
实现类
计算 slot 本地性
\tallocateSlotsAndDeploy(SchedulingStrategyUtils.getAllVertexIdsFromTopology(schedulingTopology));
\t.map(this::getExecutionVertex)
调用内部实现
找到一个合适的就返回
schedulingStrategy.startScheduling();
通过 ResourceManagerClient 向 ResourceManager 申请 Container
findMatchingSlot(resourceProfile)
return resourceProfile.isMatching(required);
ExecutionVertexDeploymentOption == ExecutionVertexId
这个 slot 已经被占用了,更新状态
startAllOperatorCoordinators();
slotProviderStrategy.allocateSlot(××××)
调度
if (this.equals(UNKNOWN)) {\t\t\treturn false;\t\t}
YarnResourceManager. startNewWorker(WorkerResourceSpec workerResourceSpec);
if (this.equals(ANY)) {\t\t\treturn true;\t\t}
if(null != jobManagerRegistration) {
后面的代码流程见:Flink 集群启动——从节点(TaskManager)创建流程源码(Standalone模式)中 requestSlot(*******)
请求的保存
向 TaskExecutor 发起 RPC 请求申请 Slot
return singleTaskSlot;
return new ExecutionVertexSchedulingRequirements(***)
尝试为给定的插槽请求分配插槽。 如果没有可用的插槽,则通知资源管理器分配更多资源,并为请求注册超时 Flink 的 ResourceManager 向 YARN 的 ResourceManager 申请更多的 Container 来启动 TaskExecutor
判断申请 slot 的 JobMaster 和 注册的 Job 的 Master 地址是否一样, 如果不一样,则放弃。防止因为 JobMaster 迁移导致申请了双倍的slot导致资源浪费
return new ExecutionVertexSchedulingRequirements.Builder() .×××××× .build();
startSchedulingInternal();
如果为 YARN 模式,并且资源不足的时候,会走这个分支
先从 Free 状态的 Slot 中,寻找合适的 Slot
向 ResourceManager 申请 slot
Y调用 SlotManagerImpl 来申请 slot
fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)
调用 ResourceManager 的代理对象,来申请 Slot
调用监听的回调方法
acknowledge != null申请成功
OperatorCoordinator 启动
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
resourceActions.allocateResource(defaultWorkerResourceSpec)
从 ExecutionGraph 中获取 待调度的 ExecutionVertex
if(slotAndLocality.isPresent())
从 Free 状态的 Slot 中,寻找合适的 Slot
slotMatchingStrategy\t\t\t.findMatchingSlot()
schedulerNG.startScheduling();
Yarn per job模式
生成 AllocationID
申请 SingleSlot
开始调度
第一步: 先尝试从可用的 slot 中去申请( 先尝试从 SlotPool 可用的 AllocatedSlot 中获取)
先获取 JobID, SlotRequest 中携带 JobID 判断该 Job 是否已经注册过。
请求 ResourceManager 分配资源,通过 ResourceActions#allocateResource(ResourceProfile) 回调进行
获取当前 JobGragh 的所有 OperatorCoordinator喜欢遍历
return executionVertexSchedulingRequirements.stream().map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)\t\t\t.filter(Objects::nonNull).collect(Collectors.toSet());
更改 ExectionVertex 状态
waitForAllSlotsAndDeploy(deploymentHandles);
将每个 ExecutionVertexID 对象 变成 ExecutionVertexDeploymentOption 对象
freeSlots是在TM启动时赋值的
各种映射处理
stashRequestWaitingForResourceManager(pendingRequest);
构建的 DeploymentHandle
if(allocatedSlot.tryAssignPayload(singleTaskSlot))
申请 YARN Container 用于启动一个 TaskExecutor
final AllocationID allocationId = new AllocationID();
申请失败
获取和 TaskManager 的链接
DefaultScheduler.allocateSlotsAndDeploy()
NormalSlotProviderStrategy的实现调用 SchedulerImpl 来申请 Slot
DefaultScheduler.startSchedulingInternal();
waitingForResourceManager:处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)
申请资源
开始调度执行 DefaultScheduler.startScheduling();
coordinator.start();
请求Slot
申请到了,则封装一个 SingleLogicalSlot 返回
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
JobMaster.startScheduling()();
.map(ExecutionVertexSchedulingRequirementsMapper::from)
Y
如果请求成功,则取消 pendingSlotRequest,并更新 slot 状态 PENDING -> ALLOCATED
DefaultExecutionSlotAllocator.allocateSlotsFor()
Set<AllocationID> allPreviousAllocationIds = computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
资源申请好了真正的开始部署task
入参taskManagerSlot是申请到的资源
return freeSlots.stream().filter(slot -> slot.isMatchingRequirement(requestedProfile)).findAny();
\texecutionGraph.registerJobStatusListener(jobStatusListener);
N 申请失败
jobStatusListener = new JobManagerJobStatusListener();\t\tschedulerNG.registerJobStatusListener(jobStatusListener);
\tprepareExecutionGraphForNgScheduling();
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
* 注释: startNewWorker 和 stopWorker 这两个抽象方法是实现动态申请和释放资源的关键。 * 对于 Standalone 模式而言, TaskExecutor 是固定的,不支持动态启动和释放;而对于在 Yarn 上运行的 Flink, * YarnResourceManager 中这两个方法的具体实现就涉及到启动新的 container 和释放已经申请的 container。 *
allocateSlotsAndDeploy(totalSlot); 参数为总并行度,这样子就知晓总共需要申请多少 Slot 了 从参数就能看出来,这些都是被封装好的,可以直接被调度执行的 DefaultExecutionVertex
调用 SlotPoolImpl 来申请 slot
更新 Slot 的状态
\treturn taskManagerRegistration.getNumberRegisteredSlots();
资源配置
StandaloneResourceManager. startNewWorker(WorkerResourceSpec workerResourceSpec);
freeSlots.remove(taskManagerSlot.getSlotId());
List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = new ArrayList<>(executionVertexSchedulingRequirements.size());
发送请求,申请slot
return slots.size();
* 1、ExecutionVertexID* 2、ExecutioinGraph* 3、根据以上两个条件找到 ExecutionVertex, 然后封装成 ExecutionVertexDeploymentOption * executionVertexDeploymentOptions.size() = Task Numbers
TaskManagerSlot 状态变为 PENDING
* 1、申请到了 slot* 2、构件好了 Handler* 3、执行部署
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();\t\tTaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
改变 Job 状态
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
ExecutionVertex == ExecutionVertexSchedulingRequirements
开启新的 TaskExecutor, 如果是 YARN 的会去做,如果是 StandAlone 的话就不會創建新的TaskExecutor
ExecutionVertexId ==ExecutionVertex
开始调度执行JobMaster 调度 StreamTask 去运行
* 参数内部执行一下操作: * ExecutionVertexDeploymentOption ==> ExecutionVertexId ==>ExecutionVertex ==> ExecutionVertexSchedulingRequirements
executionSlotAllocator.allocateSlotsFor()
for(OperatorCoordinatorHolder coordinator : coordinators)
internalRequestSlot(pendingSlotRequest)
startScheduling()
registerJobMetrics();
将 PendingTaskManagerSlot 指派给 PendingSlotRequest
\tjobStatusListeners.add(listener);
pendingTaskManagerSlotOptional = allocateResource(resourceProfile)
获取 ExecutionGraph 中的 ExecutionVertex
executionSlotAllocator.allocateSlotsFor(\t\t\texecutionVertexDeploymentOptions.stream()\t\t\t\t.map(ExecutionVertexDeploymentOption::getExecutionVertexId)\t\t\t\t.map(this::getExecutionVertex)\t\t\t\t.map(ExecutionVertexSchedulingRequirementsMapper::from)\t\t\t\t.collect(Collectors.toList()));
transitionToScheduled(verticesToDeploy);
计算待申请的 Slot 的个数
JobID jobId = slotRequest.getJobId();\t\tJobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
首先从 FREE 状态的已注册的 slot 中选择符合要求的 slot,刚才 ResourceManager 通过调用: SolotManagerImpl的 findMatchingSlot() == 逻辑计算(找出一个 最合适的 free 状态的 slot),如果代码走第一个分支,那么方法的返回结果中,必然包含: SlotID 和 TaskExecutorID
接:Flink源码——Job 提交、部署流程源码分析之构建ExecutionGraph
return false;
如果有已经有可用的了,就创建一个 SingleLogicalSlot,并作为 AllocatedSlot 的payload
resourceManagerGateway.requestSlot(jobMasterId,new SlotRequest)
return requestNewAllocatedSlotInternal(pendingRequest).thenApply(Function.identity());
if (required.equals(UNKNOWN)) {\t\t\treturn true;\t\t}
0 条评论
下一页
为你推荐
查看更多