Flink源码——Job 提交、部署流程源码分析之构建ExecutionGraph
2022-04-08 17:15:28 26 举报
Flink源码——Job 提交、部署流程源码分析之构建ExecutionGraph
作者其他创作
大纲/内容
间隔循环调用
flushAll();
构建 NormalSlotProviderStrategy
* leaderContender = JobManagerRunnerImpl * leaderContender = ResourceManager * leaderContender = DefaultDispatcherRunner * leaderContender = WebMonitorEndpoint * * leaderElectionService.start(this); * leaderContender = this
writeLeaderInformation();
initRegionExecutionViewByVertex();
leaderLatch.addListener(this);\t\t\tleaderLatch.start();
调用每一个 StreamOperator 的 snapshotState 方法生成快照并存储到状态后端。
释放 IntermediateResultPartition 的策略: RegionPartitionReleaseStrategy
遍历到一个 JobEdge
final AbstractInvokable invokable = this.invokable;
return startJobMaster(leaderSessionId);
生成 DefaultJobMasterServiceFactory
new ThrowingRestartStrategy();
调度 并启动 JobMaster
return new RegionPartitionReleaseStrategy.Factory();
启动 jobManagerRunner
发回某个 Task 的 checkpoint ack 反馈
Task 触发 Checkpoint
reconnectToResourceManager(new FlinkException(\"Starting JobMaster component.\"));
TaskExecutor.triggerCheckpoint
2
\tCheckpointMetrics checkpointMetrics = new CheckpointMetrics().setAlignmentDurationNanos(0L);
preCheckGlobalState(request.isPeriodic);
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
preCheckGlobalState(isPeriodic);
如果不存在
客户端重启策略
checkpointStorage.clearCacheFor(checkpointId);
这必须在协调器级锁定之外发生,因为它与外部服务进行通信(在HA模式下),并且可能会阻塞一段时间。
RetrievableStateHandle<T> storeHandle = storage.store(state);
this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
更改PendingCheckpoint 为 CompletedCheckpoint
获取 CheckpointCoordinatorConfiguration
触发 checkpoint 的定时器(线程池)
如果是 Task 正在运行这里依然执行判断, SourceStreamTask 的运行状态,必须是 RUNNING 状态, 而且 invokable 不能为空
for(JobVertex vertex : jobGraph.getVertices()) {vertex.initializeOnMaster(classLoader);}
ejv.connectToPredecessors(this.intermediateResults);
**从 taskSlotTable 中通过 executionAttemptID 获取到一个 Task 对象\t\t * *Task 对应到: Execution, 必然是一个 SourceStreamTask
返回 executionGraph
创建 CheckpointFailureManager,管理 checkpoint 失败后的策略
Flink源码(1.11.x) Job 提交、部署流程源码分析之构建ExecutionGraph
final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
将当前 IntermediateResult 作为 ExecutionJobVertex 的输入 加入 inputs 集合,作为 ExecutionJobVertex 的输入
runningJobsRegistry.clearJob(jobId);
return ackTasks;
scheduler.start(getMainThreadExecutor());
通知 Task
* 注释:ZooKeeperLeaderElectionService = leaderElectionService\t\t\t * 选举成功,则跳转到:ZooKeeperLeaderElectionService 的 isLeader() 方法
* 注释: 提交一个 Mail 到 mainMailboxExecutor 中运行 * 待执行的 checkpoint 被封装成为 Mail 提交给 mainMailboxExecutor 来执行 * - * TaskManager 接收到 JobMaster 的 TriggerCheckpoint 消息后, * 经过层层调用最后使用 AbstractInvokable 的 triggerCheckpointAsync 方法来处理。 * AbstractInvokable 是对在 TaskManager 中可执行任务的抽象。 * triggerCheckpointAsync 的具体实现在 AbstractInvokable 的子类 StreamTask 中, * 其核心逻辑就是使用线程池异步调用 triggerCheckpoint 方法。
启动 JobMasterjobMasterService = JobMaster 实例
启动 JobManagerRunner提交任务 == start JobManagerRunner
启动ck
其实,checkpoint,就是针对所有的 Task State 持久化一次
重启策略
通过 TaskExecutorGateway 发送 RPC 消息给 对应的 TaskManager
\texecutor.submit(request);
一个 JobVertex 对应的创建一个 ExecutionJobVertex
返回souces的ExecutionVertex集合
实现循环发送心跳的效果\t\t 1、心跳时间:10s钟 2、心跳超时时间:50s钟
将 TaskManager 返回来的各种信息封装成 AcknowledgeCheckpoint
if(shutdown)
return new JobManagerRunnerImpl()
见:Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(一) flushAll();代码
currentPeriodicTrigger.cancel(false);
RegionPartitionReleaseStrategy
提交
获取到 JobEdge 链接的 IntermediateResult
因为是 Job 恢复,所以要从最近一次的 快照中,执行 Task 的 state restore
主要作用是把 ExecutionGraph 中的 ExecutionVertex 封装成 待调度的 DefaultExecutionVertex
执行 checkBatchSlotTimeout() 方法
从最近有效完整的 checkpoint 中进行 job 恢复
coordinator 处于 shutdown 状态,取消 checkpoint
* 注释: 获取 CheckpointStorageLocation, 即 checkpoint 持久化时的保存位置。根据选择的 StateBackend,会有以下两种:\t\t\t\t * 1、MemoryBackendCheckpointStorage: MemoryStateBackend 对应,\t\t\t\t * checkpoint 数据一般保存在内存里(如果指定了 checkpoint dir,会把 metadata 持久化到指定地址)\t\t\t\t * 2、FsCheckpointStorage: 对于 FsStateBackend 和 RocksDBStateBackend,\t\t\t\t * 都是要将 checkpoint 持久化数据保存到文件系统(常见的如 HDFS S3等)
返回所有需要 ack 的 Execution 的集合
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
触发 master hook
获取 checkpointID
工作准备就绪,请尝试与资源管理器建立连接
让 ResultPartition 的所有 ResultSubPartition 都执行 flush 动作
进入 checkpoint state restore 流程
如果 PendingCheckpoint 不为空,并且未取消
跳转到 JobMaster 的 onStart()
persistAndRunJob(()
if (isPreferCheckpointForRecovery && allCheckpoints.size() > 1 && lastCompleted.getProperties().isSavepoint()) {
ZooKeeperCheckpointIDCounter
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
开始调度执行JobMaster 调度 StreamTask 去运行
循环执行心跳检测
获取当前 ExecutionVertex 正在执行的 Execution
如果存在,则替换
获取所有的 Source ExecutionVertex
异步提交 JobMaster 通知 checkpointCoordinator
applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
final boolean partitionReleaseDuringJobExecution = configuration.getBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION);
准备 SnapshotPreBarrier
for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++)
完成 checkpoint
处理 JobEdge 和 IntermediateResult 和 ExecutionJobVertex中的 ExecutionVertex
switch(pattern)
N
启动 JobMaster
* 注释: 分布式模式,有两种: * 1、ALL_TO_ALL 上游每个 Task 跟下游每个 Task 都有链接 * 2、POINTWISE 上游 Task 不一定跟下游每个 Task 都有链接
pre-checks,检查是否满足进行 checkpoint 的条件。
ExecutionGraph:JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。它包含的主要抽象概念有1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有 和并发度一样多的 ExecutionVertex。2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。 3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个 IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。 4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是 ExecutionVertex,consumer是若干个ExecutionEdge。 5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition, target是ExecutionVertex。source和target都只能是一个。 6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。源码核心代码入口:ExecutionGraph executioinGraph = SchedulerBase.createAndRestoreExecutionGraph()在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph在创建 SchedulerBase 的子类:DefaultScheduler 的实例对象的时候,会在 chedulerBase 的构造方法中去生成 ExecutionGraph。
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
遍历 Source ExecutionVertex
提交一个 AsyncCheckpointRunnable 任务
最后一个参数是 true
rescheduleTrigger
newExecJobVertices.add(ejv);
创建一个 PendingCheckpoint当 checkpoint 执行完毕之后,就会由 PendingCheckpoint 变成 CompletedCheckpoint
CompletableFuture<Boolean> result = new CompletableFuture<>();
创建 JobManagerRunner(JobMaster)
在 Flink 的心跳机制中,跟其他的 集群不一样:\t\t * 1、ResourceManager 发送心跳给 从节点 Taskmanager\t\t * 2、从节点接收到心跳之后,返回响应
确认是否是 Leader
保存 JobGraph jobGraphWriter = ZooKeeperJobGraphStore
开始调度部署Task部署之前需要先申请资源(Slot)
真正做 checkpoint 触发开始
初始化 StateSnapshotContextSynchronousImpl
初始化一些必要的服务组件JobMaster 的注册和心跳
Y
上游 Task 不一定跟下游每个 Task 都有链接
YisDebugEnabled
int vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
\toperatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
代码流程见:Flink 集群启动——从节点(TaskManager)创建流程源码(Standalone模式)
所有顶点都是 commitVertices
1
Execution ee = ev.getCurrentExecutionAttempt();
* 注释: 设置 ExecutionGraph 的一些基本属性 * 1、JsonPlanGenerator.generatePlan(jobGraph) 根据 JobGraph 生成一个 JsonPlan * 2、executionGraph.setJsonPlan(JsonPlan) 把 JsonPlan 设置到 ExecutionGraph
启动 SlotPool 服务当前这个 JobMaster 有一个 SlotPool slot 池子
* 如果 Checkpoint 执行成功,AsyncCheckpointRunnable 最后\t\t\t\t * 会调用 TaskStateManagerImpl 的 reportTaskStateSnapshots 方法向 JobManager 发送 AcknowledgeCheckpoint 消息。\t\t\t
汇报一些监控指标数据信息
把 JobMaster 的 State 数据,按照 OperatorID 的映射方式来保存
* 注释: 创建 CheckpointFailureManager,管理 checkpoint 失败后的策略 * 当由于各种原因 checkpoint 失败时,CheckpointFailureManager 负责进行处理,其中有两个重要的参数: * 1、continuousFailureCounter: checkpoint 连续失败的次数,AtomicInteger 类型,确保每个 checkpoint 只被计算一次 * 2、tolerableCpFailureNumber: 可以容忍的 checkpoint 失败次数,不指定时,默认为 -1。 * continuousFailureCounter 大于该值时,作业会进入作业级别的失败策略
将 Job 状态写入 ZK
运行完毕该 job 就从 jobGraphWriter 移除
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
创建心跳服务: taskManagerHeartbeatManager
初始化 SchedulerBase1、获取 ExecutioinGragh2、获取 OperatorCoordinator
long checkpointID = checkpointIdCounter.getAndIncrement();
this.executionVerticesList.add(schedulingVertex);
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
for(ExecutionVertex ev : tasksToWaitFor)
当 Job 执行结束之后,删除该 Job 相关的数据
DefaultScheduler
CompletedCheckpoint prev = listIterator.previous(); f (!prev.getProperties().isSavepoint())
遍历每个source的 Execution
条件检查,检查是否满足进行 checkpoint 的条件
告诉 CheckpointCoordinator 如果有一个 Task 不在运行状态,则停止 checkpoint
ZooKeeperLeaderElectionService.isLeader()
Flink Job 重启策略
状态后初始化
completePendingCheckpoint(checkpoint);
resetAndStartScheduler();
LOG.info(\"Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.\
遍历每个 InputChannel
case POINTWISE:
new ExecutionEdge
获取最近一次快照
保存 checkpoint 请求
获取: CheckpointCoordinator
cancelPeriodicTrigger();
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
for(HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);}
生成和启动一个 JobMaster
SchedulerImpl
CheckpointCoordinator
* 注释: 创建 JobManagerRunner\t\t * 在这里面会做一件重要的事情:\t\t * 1、创建 JobMaster 实例\t\t * 2、在创建 JobMaster 的时候,同时会把 JobGraph 编程 ExecutionGraph\t\t * -\t\t * 严格来说,是启动 JobMaster, 那么这个地方的名字,就应该最好叫做: createJobMasterRunner\t\t * Flink 集群的一两个主从架构:\t\t * 1、资源管理: ResourceManager + TaskExecutor\t\t * 2、任务运行: JobMaster + StreamTask
JobGraph 转换成 ExecutionGraph
internalSubmitJob(jobGraph);
调用钩子方法
状态追踪器初始化
startTriggeringCheckpoint
待调度的 ExecutionVertex 加入到带调度 DefaultExecutionVertex 集合中
this.coordinatorMap = createCoordinatorMap();
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeUTF(confirmedLeaderAddress); oos.writeObject(confirmedLeaderSessionID); oos.close();
mainMailboxExecutor.execute()
关于 ExecutionJobVertex 中的 OperatorCoordinatorHolder 的初始化
如果并没有设置并行度的话,会通过计算来确定一个合适的默认值
throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
* 注释: 一个 ExecutionVertex 就对应到 到时候真正执行的 StreamTask 一个 * 正常来说,一个StrewamTask 也需要申请得到一个 Slot
RestartBackoffTimeStrategyFactoryLoader\t\t\t\t.createRestartBackoffTimeStrategyFactory()
final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory = PartitionReleaseStrategyFactoryLoader\t\t\t.loadPartitionReleaseStrategyFactory(jobManagerConfig);
加入容器
CheckpointMetaData 包含 checkpointID 和 checkpoint 时间戳
Y只有 source 顶点会成为 triggerVertices.
返回 JobManagerRunnerImpl\t\t负责启动 JobMaster
随机一段时间之后,开始一个定时调度任务
提交 Checkpoint 请求
将 ExecutionVertex 生成 待调度的 DefaultExecutionVertex
* createJobManagerRunner 方法返回 JobManagerRunnerImpl * 在 JobManagerRunnerImpl 初始化的时候,初始化了一个 JobMaster 对象DefaultJobManagerRunnerFactory。createJobManagerRunner()
if(checkpointOptions.isUnalignedCheckpoint())
triggerCheckpointHelper
3
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
this.verticesInCreationOrder.add(ejv);
如果存在,则替换
ThrowingRestartStrategyFactory
返回一个 defaultScheduler
channelStateWriter.finishOutput(checkpointId);
return executions;
调转
jobGraphWriter.putJobGraph(jobGraph);
运行 job
jobManagerRunner.start();
startHeartbeatServices();
\tthis.scheduler = checkNotNull(schedulerFactory).createScheduler(slotPool);
恢复状态
if(ee != null)
调用 StreaOperator 的 snapshotState 方法生成快照并存储到状态后端。
如果调度已禁用,则不允许定期检查点
final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
final SchedulingStrategyFactory schedulingStrategyFactory = createSchedulingStrategyFactory(jobGraph.getScheduleMode());
启动 JobMaster
比检查点之间的期望时间更频繁地安排检查点没有意义,如果 checkpoint 的时间,超过 checkpoint 的间隔时间,则 checkpoint 无意义
\tcheckpointStorage.initializeBaseLocations();
获取 Leader 信息
初始化一些必要的文件目录
异步 checkpoint
this.restartStrategy = RestartStrategyResolving\t\t\t.resolve()
ompletedCheckpoint lastCompleted = allCheckpoints.get(allCheckpoints.size() - 1);
调用图中,开始初始化return schedulerNGFactory .createInstance(××××××)
if(isPeriodic && !periodicScheduling)
创建 StateBackend
\tthis.taskVertices = new ExecutionVertex[numTaskVertices];
遍历 tasksToWaitFor
invokable = SourceStreamTask
判断,只要该 savePoint 是完整的,就是可用的
如果是 InputFormatVertex 和 OutputFormatVertex, 则可以进行一些初始化\t\t\t * 1、File output format 在这一步准备好输出目录\t\t\t * 2、Input splits 在这一步创建对应的 splits
创建调度策略工厂实例返回值: EagerSchedulingStrategy
创建 CheckpointStore
return snapshotMasterState(checkpoint);
final long checkpointId = message.getCheckpointId();
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
把 Task 的 State 数据,按照 OperatorID 的映射方式来保存
final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
startScheduling()
End
将生成好的 ExecutionJobVertex 加入到 ExecutionGraph 中
leaderElectionService.start(this);
生成一个 全局 Checkpoint 时间戳
StandaloneCheckpointIDCounter
startJobExecution(newJobMasterId)
设置 executionGraph 的 checkpoint 相关配置
executionGraph = (prior != null) ? prior : new ExecutionGraph()
返回一个 DefaultScheduler
获取所有需要进行 ack 的 ExecutionVertex
else if(ee.getState() == ExecutionState.RUNNING)
if (flushAlways)
\trunningJobsRegistry.setJobRunning(jobGraph.getJobID());
如果未对齐
SchedulerBase
return executionGraph;
final List<IntermediateResult> inputs;this.inputs.add(ires);
接:Flink 集群启动——启动脚本分析及主节点(JobManager)创建流程源码(Standalone模式)【补充主图3】
构建一个 Checkpoint Request
对 master hooks 拍摄快照
获取最近一个savepoint
* 注释: 根据 checkpointId 和 checkpointStorageLocation 创建 PendingCheckpoint * - * 注意参数: * checkpointIdAndStorageLocation 就是 initializeCheckpoint() 方法的返回值 * - * PendingCheckpoint 代表当前已经开始的 checkpoint,当 CheckpointCoordinator 收到所有 task 对 * 该 checkpoint 的 ack 消息后,PendingCheckpoint 成为 CompletedCheckpoint。 * - * PendingCheckpoint是一个启动但还未被确认的Checkpoint。等到所有Task都确认后又会转化为CompletedCheckpoint。
当前这个 Execution 被发布到那个 TaskManager 上执行
根据并行度来设置 ExecutionVertex由于每⼀个并行度都对应⼀个节点。所以要把每个节点都和前面中间结果相连
AsyncCheckpointRunnable.run
for(ExecutionVertex vertex : graph.getAllExecutionVertices()) {
为 该 Job 创建 CheckpointStorage 路径
executionTopology = new DefaultExecutionTopology(this)
executionGraph.attachJobGraph(sortedTopology);
final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
addedJobGraphs.add(jobGraph.getJobID());
ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
startJobManagerRunner()
SourceStreamTask
确保 RPC 工作正常
进行迭代
初始化 ExecutionVertex 数组
restoreStateToCoordinators(operatorStates);
构建 OperatorCoordinator 的 CheckpointContext
如果 operatorStateBackend, 则执行 operatorStateBackend 的 snapshot 动作
返回结果,包含 checkpointID 和 checkpointStorageLocation
创建 ExecutionVertex 对象
chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
状态恢复
currentPeriodicTrigger = null;
ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ?\t\t\t// TODO_MA 注释: 默认: exactly once\t\t\tchannelStateWriter.getAndRemoveWriteResult(checkpointId) :\t\t\tChannelStateWriteResult.EMPTY;
执行 checkIdleSlot() 方法
初始化 producedDataSets 数组中的每个 IntermediateResult
当前 ExecutionVertex 的 Execution 不为空,意味着就是 RUNNING 状态
空方法
* 注释: 创建一个 CheckpointCoordinator\t\t * 创建 CheckpointCoordinator,并注册 CheckpointCoordinatorDeActivator\t\t * CheckpointCoordinator 是 flink 的一个核心组件,位于 JobManager 进程,用于协调分布式快照和状态的触发与存储\t\t * -\t\t * CheckpointCoordinator 向相关算子(全部 source 算子)发送触发 checkpoint 的消息,\t\t * 并收集每个算子上报的快照完成的 ack 消息,这些 ack 消息包含算子进行快照后的状态句柄,\t\t * CheckpointCoordinator 则对这些状态句柄进行维护;待所有算子都上报 ack 消息后,\t\t * CheckpointCoordinator 将这些元数据信息进行保存(根据选择的 StateBackend 保存在不同的位置)
拿到 checkpoint directory\t\t * 1、checkpointBaseDirectory 父目录\t\t * 2、jobId 子目录
参数默认为 True: jobmanager.partition.release-during-job-execution = true
针对 StreamOperator 执行状态校验
this.executionVerticesList = new ArrayList<>(graph.getTotalNumberOfVertices());
* 启动好了一个 JobMaster\t * 1、JobMaster 需要向: ResourceManager 心跳汇报\t * 2、JobMaster 需要向: TaskManager 维持心态
initializeCheckpoint 方法的主要作用,就是用来生成: checkpointID 和 checkpointStorageLocation
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
for(int i = 0; i < tasksToTrigger.length; i++)
客户端正常提交一个 job 的时候,最终由 集群主节点中的 Dispatcher 接收到来
获取jobvertex 并行度
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
执行 Checkpoint 的初始化
\tcheckpointIDCounter.start();
return new EagerSchedulingStrategy.Factory();
如果 Checkpoint 执行成功,AsyncCheckpointRunnable 最后\t\t * \t会调用 TaskStateManagerImpl 的 reportTaskStateSnapshots 方法向 JobManager 发送 AcknowledgeCheckpoint 消息。
JobManagerRunnerImpl.grantLeadership(issuedLeaderSessionID);
创建一个 CheckpointCoordinator
输出结果集,也就是 IntermediateDataSet 集合
执行 Checkpoint 的执行, 主要做两件事情:\t * 1、创建Checkpoint Barrier并向下游节点广播\t\t\t * 2、触发本节点的快照操作
// 周期性 checkpoint 调度被取消 (periodicScheduling=false), //一般 periodicScheduling=false 时,是因为用户手动触发了 savepoint
ZooKeeperRunningJobsRegistry
使用创建顺序保存的 ExecutionJobVertex
* 注释: 遍历每一个 TaskExecutor 出来,然后发送 心跳请求\t\t\t * 每一次 TaskEXecutor 来 RM 注册的时候, 在注册成功之后,就会给这个 TaskEXecutor 生成一个\t\t\t * Target, 最终,这个 Target 被封装在 : Monitor,\t\t\t * 最终,每个 TaskEXecutor 对应的一个唯一的 Monitor 就被保存在 heartbeatTargets map 中\t\t\t * RM 在启动的时候,就已经启动了: TaskManagerHeartbeamManager\t\t\t * 这个组件的内部: 启动了一个 HearBeatManagerSenderImpl 对象。\t\t\t * 内部通过一种特别的机制,实现了一个 每隔 10s 调度一次 该组建的额 run 运行\t\t\t * 最终的效果;\t\t\t * RM 启动好了之后,就每隔10s 钟,向所有的已注册的 TaskEXecutor 发送心跳请求\t\t\t * 如果最终,发现某一个 TaskExecutor 的上一次心跳时间,举例现在超过 50s\t\t\t * 则认为该 TaskExecutor 宕机了。 RM 要执行针对这个 TaskExecutor 的注销
创建 SlotPool
检查是否有 CheckpointCoordinator
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)
需要在 checkpoint 完成后,收到 CheckpointCoordinator “notifyCheckpointComplete” 消息的顶点。
cache.getListenable().addListener(this);\t\t\tcache.start();
\t *第一步\t\t * 向下游发送 Barrier 前,给当前 Task 的每个 operator 进行逻辑处理的机会。\t\t * 这里会调用当前 Task 中所有 operator 的 prepareSnapshotPreBarrier() 方法
HeartbeatManagerSenderImpl构造方法
this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
jobMasterFactory = new DefaultJobMasterServiceFactory()
return jobManagerRunner;
if(checkpointCoordinator != null)
上游每个 Task 跟下游每个 Task 都有链接
分配状态
return getJobRestartStrategyFactory(jobRestartStrategyConfiguration)\t\t\t.orElse(getClusterRestartStrategyFactory(clusterConfiguration)\t\t\t\t.orElse(getDefaultRestartStrategyFactory(isCheckpointingEnabled)));
storage = ZooKeeperStateHandleStore
核心ExecutionGraph 事实上只是改动了 JobGraph 的每个节点,而没有对整个拓扑结构进行变动,挨个遍历 jobVertex 并进行处理
partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
* 注释: checkpointID 用于标识一次 checkpoint,由 CheckpointIDCounter 生成,根据是否开启 HA 模式,有以下两种实现类:\t\t\t\t * 1、StandaloneCheckpointIDCounter: 未开启 HA 模式,实际由 AtomicLong 自增实现\t\t\t\t * 2、开启 HA 模式,使用了 Curator 的 分布式计数器 SharedCount,flink on yarn 模式下,\t\t\t\t * 默认计数保存地址为 /flink/{yarn-app-id}/checkpoint-counter/{job-id}
for(int num = 0; num < inputs.size(); num++)
this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
final Task task = taskSlotTable.getTask(executionAttemptID);
遍历每个 JobEdge
if(advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {\t\t\tthrow new IllegalArgumentException(\"Only synchronous savepoints are allowed to advance the watermark to MAX.\");\t\t}
return serverStrategyFactory.createRestartStrategy();
* 注释: checkpoint\t\t * 注意: 第四个参数: 这是用来决定是采用 同步 checkpoint 还是异步 checkpoint 的核心参数\t\t * 1、false 表示 周期调度,自动调度,异步\t\t * 2、true 表示 手动 savepoint
持续时间之间的最大值”可以为一年-这是为了防止数值溢出
return schedulerNGFactory .createInstance(××××××)
RpcEndpoint
Fink 的 选举,和 HBase 一样都是通过 ZooKeeper 的 API 框架 Curator 实现的 * 1、leaderLatch.start(); 事实上就是举行选举 * 2、当选举结束的时候: * 如果成功了: isLeader() * 如果失败了: notLeader()
* 注释: JobMaster 收到 Task 的 AcknowledgeCheckpoint 消息后,会调用 CheckpointCoordinator 的\t * receiveAcknowledgeMessage 方法来处理。\t * PendingCheckPoint 中记录了本次 Checkpoint 中有哪些 Task 需要 Ack,如果 JobMaster 已经收到所有的 Task 的 Ack 消息,\t * 则调用 completePendingCheckpoint 向 Task 发送 notifyCheckpointComplete 消息通知 Task 本次 Checkpoint 已经完成。
待调度的 DefaultExecutionVertex 的集合初始化
ScheduledTrigger.run
构造一个 HeartbeatManagerImpl 实例返回
MemoryBackendCheckpointStorage
case ALL_TO_ALL:
第二步\t\t * 生成 Barrier 并向下游的ResultSubPartition(当前并行度)广播 checkpoint Barrier 消息,下游 Task 收到该消息后就开始进行自己的 checkpoint 流程
start();
jobGraphWriter.removeJobGraph(jobId);
log.debug(printStatus());
this.inputs = new ArrayList<>(jobVertex.getInputs().size());
List<JobEdge> inputs = jobVertex.getInputs();
当 JobMaster 启动好了之后,更改 Job 状态为 RunningrunningJobsRegistry = ZooKeeperRunningJobsRegistry
返回 ZooKeeperLeaderElectionService
给下游所有的 Task 发送 CheckpointBarrier 消息
获取到 TaskManager 的位置
执行异步 Checkpoint
* 注释:如果 Checkpoint 执行成功,AsyncCheckpointRunnable 最后会调用 TaskStateManagerImpl\t\t\t\t * 的 reportTaskStateSnapshots 方法向 JobManager 发送 AcknowledgeCheckpoint 消息。
异常 如果任务没在运行,则直接回复 DeclineCheckpoint(拒绝Checkpoint)
* 注释: 需要执行 Checkpoint 相关行为的 ExecutionVertex\t\t * 1、tasksToTrigger 对应到 JobGraph 中的: triggerVertices\t\t * 2、tasksToWaitFor 对应到 JobGraph 中的: ackVertices\t\t * 3、tasksToCommitTo 对应到 JobGraph 中的: commitVertices
初始化一个容器,用来存储所有需要进行 ack 的 ExecutionVertex
发送 Checkpoint 消息
throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
内部调用 StreamOperator 的 snapshotState 方法生成快照并存储到状态后端。
执行 checkpoint
* 第三步\t\t * 如果是非对齐 checkpoint\t\t * 准备溢写 in-flight buffers 为了 input 和 output
如果我们注入检查点,则无法对齐
\telse if (addedJobGraphs.contains(jobGraph.getJobID())) {
获取到 所有 checkpoint
startJobMasterServices();
如果 keyedStateBackend, 则执行 keyedStateBackend 的 snapshot 动作\t\t\t * keyedStateBackend = HeapKeyedStateBackend
循环JobGraph 的所有 ExecutionVertex
创建ExecutionEdgesource是IntermediateResultPartition, target是ExecutionVertex。source和target都只能是一个
再次检查
获取 SchedulingStrategyFactoryschedulingStrategy = EagerSchedulingStrategy
判断存在与否
* 注释: 工作准备就绪,请尝试与资源管理器建立连接\t\t * 注册 start() 方法的参数:\t\t * 1、ResourceManagerLeaderListener 是 LeaderRetrievalListener 的子类\t\t * 2、NodeCacheListener 是 curator 提供的监听器,当指定的 zookeeper znode 节点数据发生改变,则会接收到通知\t\t * 回调 nodeChanged() 方法\t\t * 3、在 nodeChanged() 会调用对应的 LeaderRetrievalListener 的 notifyLeaderAddress() 方法\t\t * 4、resourceManagerLeaderRetriever 的实现类是: LeaderRetrievalService的实现类:ZooKeeperLeaderRetrievalService\t\t * 5、resourceManagerLeaderRetriever 进行监听,当发生变更的时候,就会回调:ResourceManagerLeaderListener 的 notifyLeaderAddress 方法
final String jobName = jobGraph.getName();\t\tfinal JobID jobId = jobGraph.getJobID();
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
遍历所有的 JobVertex
需要在 snapshot 完成后,向 CheckpointCoordinator 发送 ack 消息的顶点。
ExecutionJobVertex 加入 newExecJobVertices List 中
JobMaster 链接 ResourceManager主要的目的是为了: 向 ResourceManager 注册该 JobMaster
广播
默认实现: DefaultExecutionTopology
获取该 jobVertex 的所有输入 JobEdge
获取 CheckpointCoordinator
leaderContender.grantLeadership(issuedLeaderSessionID);
获取 ExecutionGraph, 只是创建了一个 ExecutionGraph 对象而已
所有顶点都是 ackVertices
final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup);
实现
long baseInterval = chkConfig.getCheckpointInterval();\t\tif(baseInterval < minPauseBetweenCheckpoints) {\t\t\tbaseInterval = minPauseBetweenCheckpoints;\t\t}
生成待调度的 DefaultExecutionVertex
for(JobVertex jobVertex : topologiallySorted)
代码流程见::Flink源码——Job 提交、部署流程源码分析之 Slot 管理(申请和释放)
OperatorCoordinator 初始化
资源配置
\tList<CompletedCheckpoint> allCheckpoints = getAllCheckpoints();
ZooKeeperLeaderElectionService.start()
启动JobMaster
pattern枚举值
组装一个目录
打印日志
final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators = buildOpCoordinatorCheckpointContexts();
需要 “触发 checkpoint” 的顶点,后续 CheckpointCoordinator 发起 checkpoint 时, 只有这些点会收到 trigger checkpoint 消息。
将当前 ExecutionJobVertex的输入 IntermediateResult 加入到 intermediateResults map 中
创建一个 CheckpointStartRequest 加入队列
加入当前 ZooKeeperJobGraphStore 实例的缓存中
从 JobGraph 中获取 JobName 和 JobID
executions[i] = ee;
return new RegionPartitionReleaseStrategy(schedulingStrategy);
setMaxParallelismInternal(maxParallelismConfigured ? configuredMaxParallelism :\t\t\tKeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices)\t\t);
isTriggering = true;
case SUCCESS:
\tfinal TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
总并行度
最后一个参数是false
if(executionState == ExecutionState.RUNNING && invokable != null)
long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();\t\tif(minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {\t\t\tminPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;\t\t}
当前 ExecutionJobVertex 的输入集合
advanceToEndOfEventTime=true
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
JobEdge edge = inputs.get(num);
if(snapshotSettings != null) {
if(advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {\t\t\treturn FutureUtils.completedExceptionally(\t\t\t\tnew IllegalArgumentException(\"Only synchronous savepoints are allowed to advance the watermark to MAX.\"));\t\t}
获取 Checkpoint StorageLocation
return ExecutionGraphBuilder .buildGraph()
初始化channelStateWriter = ChannelStateWriterImpl
创建 ExecutionGraph注意第一个参数:null
if(options.isUnalignedCheckpoint()) {\t\t\tprepareInflightDataSnapshot(metadata.getCheckpointId());\t\t}
如果有一个 Source ExecutionVertex 没有运行中的 Execution 的时候,则 checkpoint 直接失败
final Execution[] executions = getTriggerExecutions();
将 该 StreamOperator 的状态更新到 StateTable * 1、SourceOperator\t\t\t * 2、AbstractUdfStreamOperator
重启策略ThrowingRestartStrategyFactory ThrowingRestartStrategy
* 只有当 advanceToEndOfTime 为 true,意味着是是 手动checkpoint,也就是同步快照,\t\t * 才能将 watermark 标记为 max
获取 FailoverStrategy
将 ExecutionVertex 与 IntermediateResult 关联起来
第四步拍摄快照
Y生成 List 迭代器
for(Execution execution : executions)
schedulingStrategy = DefaultExecutionTopology
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
int currentVersion = jobGraphsInZooKeeper.exists(path);
final RestartStrategy clientSideRestartStrategy = RestartStrategyFactory.createRestartStrategy(clientConfiguration);
获取 checkpointID
调用 startTriggeringCheckpoint() 触发 Checkpoint + SavePoint
开始调度一个定时任务:取消CheckpointCanceller 用于后续超时情况下的 PendingCheckpoint 清理用于释放资源。
RpcEndpoint.onStart()
JobMaster.onStart()
return prev
发回反馈
重启schedulerNG.requestJobStatus() != JobStatus.CREATED创建 SchedulerNG, 返回 DefaultScheduler
triggerCheckpoint(true);
Y检查我们是否可以从保存点还原
配置 state 的 checkpoint
final DistributionPattern pattern = edge.getDistributionPattern();
获取 CheckpointStorage 相关信息
run()
return request.onCompletionPromise;
启动心跳服务
ResourceManager 给 目标发送(TaskManager 或者 JobManager) 心跳
currentPeriodicTrigger = scheduleTriggerWithDelay(tillNextMillis);
见:Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(一) targetPartition.addBufferConsumer代码
this.numVerticesTotal += ejv.getParallelism();
返回值: RegionPartitionReleaseStrategy
start方法完成,回调isleader()方法
ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);\t\tExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);\t\tExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
如果是正在运行中,则进行 Checkpoint执行 State 的 Checkpoint
加入 ZooKeeper 中
Y同步 Checkpoint\t\t\t\t * 如果是手动同步,则是同步 checkpoint
FsCheckpointStorage
先获取最近一个
根据 checkpointID 获取 PendingCheckpoint
写入 ZooKeeper 中
his.schedulerNG = createScheduler(jobManagerJobMetricGroup);
long checkpointId = checkpointMetaData.getCheckpointId();\t\tlong started = System.nanoTime();
接收到 ack 成功消息
if(checkpoint != null && !checkpoint.isDiscarded())
启动 Schduler 服务
设置InputFormatVertex 和 OutputFormatVertex
if (currentVersion == -1)
final long timestamp = System.currentTimeMillis();
* 处理 JobEdge 和 IntermediateResult 和 ExecutionJobVertex中的 ExecutionVertex * 对每个 JobEdge,获取对应的 IntermediateResult,并记录到本节点的输入上 * 最后,把每个 ExecutorVertex 和对应的 IntermediateResult 关联起来
获取 ExecutionGraph
构建包含 job 信息的 JobInformation 对象
throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
if(props.isSynchronous())
默认EAGER
设置 ExecutionGraph 的一些基本属性
0 条评论
下一页
为你推荐
查看更多