Flink 集群启动——启动脚本分析及主节点(JobManager)创建流程源码(Standalone模式)
2022-04-08 16:39:41 19 举报
Flink 集群启动——启动脚本分析及主节点(JobManager)创建流程源码(Standalone模式)
作者其他创作
大纲/内容
2
initialize();
JobMaster
RpcEndpoint
选举完成,执行ResourceManager的实现
创建
* 注释: 主节点中的三个重要的组件: * 1、ResourceManager * 2、Dispatcher * 3、WebMonitorEndpint * 启动的时候, 都会进行选举,通过选来来触发服务的启动
构建 DispatcherResourceManagerComponent
见:Flink源码——Job 提交、部署流程源码分析之提交jobGraph文件以及Job资源到集群
FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
this.rpcServer = rpcService.startServer(this);
jobGraphWriter.putJobGraph(jobGraph);
Dispatcher.onStart()
从请求中获取文件:包含 JobGraph 序列化文件
入参
new SessionDispatcherLeaderProcess(jobGraphStoreFactory.create())
resourceManagerRetrievalService 启动
tryAcceptLeadership(newLeaderSessionID)
extendssuper(configuration)
开启 RM 服务
stopDispatcherLeaderProcess()
startJM
初始化 actorSystem
创建 Dispatcher 的 Gateway
如果recoveredJobs有待恢复的数据,恢复 JobGraghs
得到一个 StandaloneResourceManager 实例对象
clusterComponent = dispatcherResourceManagerComponentFactory .create()
dispatcherGatewayRetriever = new RpcGatewayRetriever<>()
onStart();
startServices();
引导程序初始化, 把所有中断的 job 恢复执行
执行上传
FencedRpcEndpoint
集群启动入口
第一个参数:ZooKeeperLeaderElectionService 第三个参数:SessionDispatcherLeaderProcessFactoryFactory
创建 StandaloneResourceManager 实例对象,创建重要组件ResourceManager
cache.getListenable().addListener(this);\t\t\tcache.start();
startDispatcherServices()
registerTaskExecutorMetrics();
HaServicesJobGraphStoreFactory.create()
开启定时任务
执行选举
(standalonesession) #注释: JobManager 的启动主类 StandaloneSessionClusterEntrypoint CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint ;;
从 JobGraghFile 文件中,反序列化回来 JobGragh 对象
创建重要组件并启动 Dispatcher
提交job时创建
* 注释: 遍历每一个 TaskExecutor 出来,然后发送 心跳请求\t\t\t * 每一次 TaskEXecutor 来 RM 注册的时候, 在注册成功之后,就会给这个 TaskEXecutor 生成一个\t\t\t * Target, 最终,这个 Target 被封装在 : Monitor,\t\t\t * 最终,每个 TaskEXecutor 对应的一个唯一的 Monitor 就被保存在 heartbeatTargets map 中\t\t\t * RM 在启动的时候,就已经启动了: TaskManagerHeartbeamManager\t\t\t * 这个组件的内部: 启动了一个 HearBeatManagerSenderImpl 对象。\t\t\t * 内部通过一种特别的机制,实现了一个 每隔 10s 调度一次 该组建的额 run 运行\t\t\t * 最终的效果;\t\t\t * RM 启动好了之后,就每隔10s 钟,向所有的已注册的 TaskEXecutor 发送心跳请求\t\t\t * 如果最终,发现某一个 TaskExecutor 的上一次心跳时间,举例现在超过 50s\t\t\t * 则认为该 TaskExecutor 宕机了。 RM 要执行针对这个 TaskExecutor 的注销
通过 BlobClient 上传 jar 等资源到 BlobServer
读取配置文件,获取所有的TM的节点
创建实例
TMWorkers start
createNewDispatcherLeaderProcess(leaderSessionID);
AbstractDispatcherLeaderProcess.start()
提供对 JVM 执行环境的访问的实用程序类,如执行用户(getHadoopUser())、启动选项或JVM版本
初始化MetricFetcher
* leaderContender = JobManagerRunnerImpl * leaderContender = ResourceManager * leaderContender = DefaultDispatcherRunner * leaderContender = WebMonitorEndpoint * * leaderElectionService.start(this); * leaderContender = this
dispatcherRunner
slotRequestTimeoutCheck = scheduledExecutor\t\t\t.scheduleWithFixedDelay()
调用对象: DefaultDispatcherGatewayServiceFactory这里创建Dispather
bootstrap = new DefaultDispatcherBootstrap(recoveredJobs);
taskmanager.sh
ClusterEntrypoint
获取 依赖 jar
1
actorSystem = BootstrapTools .startLocalActorSystem()
leaderElectionService.start(this);
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
* 注释: 第一步\t\t\t * 创建一个 Akka rpc 服务 commonRpcService: 基于 Akka 的 RpcService 实现。\t\t\t * RPC 服务启动 Akka 参与者来接收从 RpcGateway 调用 RPC\t\t\t * commonRpcService 其实是一个基于 akka 得 actorSystem,其实就是一个 tcp 的 rpc 服务,端口为:6123\t\t\t * 1、初始化 ActorSystem\t\t\t * 2、启动 Actor
负责管理集群插件,这些插件是使用单独的类加载器加载的,以便它们的依赖关系,不要干扰 Flink 的依赖关系。
以上,主节点上的 WebMonitorEndpoint 组件的 Netty 服务端起好了任务提交的时候: 启动 Netty 的客户端
jobGraphStoreFactory.create()
* 三种; * 1、本地 Local 客户端的时候会用 JobGragh ===> JobGraghFile * 2、HDFS FileSytem(DistributedFileSystem) * 3、封装对象 HadoopFileSystem, 里面包装了 HDFS 的 FileSYSTEM 实例对象
StandaloneResourceManagerFactory.createResourceManager()
创建一个 SupervisorActor
上传 jar
FutureUtils.assertNoException(runJob(recoveredJob).handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
SessionDispatcherLeaderProcessFactory.create(leaderSessionID);
create()声明若干变量
ClusterEntrypoint.createDispatcherResourceManagerComponentFactory()
构建一个 StandaloneDispatcher 返回
* 注释: 第七步: ArchivedExecutionGraphStore: 存储 execution graph 的服务, 默认有两种实现,\t\t\t * 1、MemoryArchivedExecutionGraphStore 主要是在内存中缓存,\t\t\t * 2、FileArchivedExecutionGraphStore 会持久化到文件系统,也会在内存中缓存。\t\t\t * \t这些服务都会在前面第二步创建 DispatcherResourceManagerComponent 对象时使用到。\t\t\t * \t默认实现是基于 File 的
DispatcherRestEndpoint.initializeHandlers()
registerDispatcherMetrics(jobManagerMetricGroup);
* 注释: 第五步 * 初始化一个心跳服务 * 在主节点中,其实有很多角色都有心跳服务。 那些这些角色的心跳服务,都是在这个 heartbeatServices 的基础之上创建的 * 这才是真正的 心跳服务的 提供者。 * 谁需要心跳服务,通过 heartbeatServices 去提供一个实例 HeartBeatImpl,用来完成心跳* 注释: 获取心跳的两个关键参数: * 1、心跳间隔时间 heartbeatInterval(heartbeat.interval = 10000) * 2、心跳超时时间 heartbeatTimeout(heartbeat.timeout = 50000) * 注释: 启动心跳服务 new HeartbeatServices
注释: 第二步,初始化一个 ioExecuto
startServicesOnLeadership();
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
ResourceManagerFactory.createResourceManager()
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
配置安全相关配置:securityContext = NoOpSecurityContext
再启动一个新的,调用: AbstractDispatcherLeaderProcess.start()
ClusterEntrypoint.runCluster()
//注释: 注意这个 this 对象 //注释: 执行选举,成功之后,调用 leaderElectionService.isLeader() //注释: this = ResourceManager
ZooKeeperHaServices 启动
heartbeatServices = createHeartbeatServices(configuration);
jobLeaderIdService = new JobLeaderIdService()
webMonitorEndpoint.start();
初始化 SlotManagerImpl
3
slotManager = new SlotManagerImpl()
//注释: 用于 ResourceManager leader 选举 //注释: resourceManagerRetrievalService = ZooKeeperLeaderRetrievalService
internalSubmitJob(jobGraph);
resourceManagerFactory .createResourceManager()
当执行完毕这个构造方法的时候,会触发调用 onStart() 方法执行
* 创建一个 DispatcherLeaderProcess* 调用对象: SessionDispatcherLeaderProcess* 返回值:SessionDispatcherLeaderProcess
用来收听: JobManager 的心跳
executor = WebMonitorEndpoint\t\t\t\t.createExecutorService()
ZooKeeperHAServices.getJobGraphStore()
EnvironmentInformation.logEnvironmentInfo
初始化一个 AkkaRpcServiceBuilder
SignalHandler.register(LOG);
开启定时任务: executionGraphCache.cleanup()
* 注释: 初始化一个 DefaultDispatcherResourceManagerComponentFactory 工厂实例\t\t\t * 内部初始化了四大工厂实例\t\t\t * 1、DispatcherRunnerFactory = DefaultDispatcherRunnerFactory\t\t\t * 2、ResourceManagerFactory = StandaloneResourceManagerFactory\t\t\t * 3、RestEndpointFactory(WenMonitorEndpoint的工厂) = SessionRestEndpointFactory\t\t\t * 返回值:DefaultDispatcherResourceManagerComponentFactory\t\t\t * 内部包含了这三个工厂实例,就是三个成员变量\t\t\t * -\t\t\t * 再补充一个:dispatcherLeaderProcessFactoryFactory = SessionDispatcherLeaderProcessFactoryFactory
选举完成,执行Dispatcher的实现DefaultDispatcherRunner
性能监控的
用于 Dispatcher leader 选举
选举完成,执行WebMonitorEndpoint的实现
抽象cpu、内存等
提交
commonRpcService=AkkaRpcServiceUtils .createRemoteRpcService(****)
ClusterEntrypoint.runClusterEntrypoint()
(taskexecutor) # 注释: TaskManager 的启动主类 TaskManagerRunner CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner ;;
通过 BlobClient 来上传 jar 资源和依赖 jar 和 jobGraph
extends
上传
补充主图1:commonRpcService()流程
leaderLatch.addListener(this);\t\t\tleaderLatch.start();
new JobSubmitHandler()
再创建SessionDispatcherLeaderProcess
返回jobGraph
DefaultDispatcherRunnerFactory.createDispatcherRunner()
dispatcherRunnerFactory\t\t\t\t.createDispatcherRunner()
加入当前 ZooKeeperJobGraphStore 实例的缓存中,如果存在,则替换
HighAvailabilityServicesUtils .createHighAvailabilityServices()
上传: JobGraph + 程序jar + 依赖 jar
dispatcher.start();
得到 jar
集群没用leader时选举
// 注释:创建 ZooKeeperHaServices\t\t\t\t// 注释: 对象的内部,包装了一个 ZooKeeper 的实例对象\t\t\t\t// 注释: 内部的 zk 代码实现: 用的 curator
* 注释: 第六步: metrics(性能监控) 相关的服务\t\t\t * 1、metricQueryServiceRpcService 也是一个 ActorySystem\t\t\t * 2、用来跟踪所有已注册的Metric
jobManagerHeartbeatManager = heartbeatServices\t\t\t.createHeartbeatManagerSender()
创建 StandaloneSessionClusterEntrypoint
用于 ResourceManager leader 选举
调用 startInternal()state = State.RUNNING;DispatcherRunner启动完成
request.getUploadedFiles()
StandaloneSessionClusterEntrypoint.Main
HeartbeatManagerSenderImpl构造方法
Supervisor 是一个 Actor .最终总结是这样的: 我们启动一个 commonRpcServices 内部启动一个 ActorSystem, 这个 ActorySystem 启动一个 Actor
Dispatcher.runJob()创建了JobMaster
在 Flink 的心跳机制中,跟其他的 集群不一样:\t\t * 1、ResourceManager 发送心跳给 从节点 Taskmanager\t\t * 2、从节点接收到心跳之后,返回响应
启动 SupervisorActor
匹配启动主类
获取配置
补充主图3:JobSubmitHandler()流程 * 注释: JobSubmit Handler 任务提交处理器 * 将来客户端提交应用程序上来,由 JobManager 中的 Netty 服务端的 JobSubmitHandler 来执行处理备注1:StreamGraph----->JobGraph(存成文件反序列上传到TM上)------->ExecutionGraph---->物理执行图备注2:上传Jar、文件是通过BlobServer(extends Thread)来处理,它是 Flink 用来管理二进制大文件的服务,Flink JobManager 中启动的 BlobServer 负责监听请求并派发线程去处理任务
run()
初始化 JobLeaderIdService
new StandaloneSessionClusterEntrypoint(configuration);
选举完成,执行JobManagerRunnerImpl的实现
启动 DispatcherRestEndpoint
启动
根据配置初始化文件系统
先停掉已有的
对传入的参数进行解析
dispatcherGatewayServiceFactory.create()
恢复执行之后,清空
return ResourceProfile.newBuilder().setCpuCores(workerResourceSpec.getCpuCores().divide(numSlotsPerWorker)) .setTaskHeapMemory(workerResourceSpec.getTaskHeapSize().divide(numSlotsPerWorker)) .setTaskOffHeapMemory(workerResourceSpec.getTaskOffHeapSize().divide(numSlotsPerWorker)) .setManagedMemory(workerResourceSpec.getManagedMemSize().divide(numSlotsPerWorker)) .setNetworkMemory(workerResourceSpec.getNetworkMemSize().divide(numSlotsPerWorker)).build();
确认 Leader
获取 JobGraph 的序列化文件的存储地
readMasters
启动关键组件:Dispatcher 和 ResourceManage
resourceManager.start();
taskManagerTimeoutCheck = scheduledExecutor\t\t\t.scheduleWithFixedDelay()
创建和启动 AkkaRpcService
handlers = initializeHandlers(restAddressFuture);
在WebMonitorEndpoint中启动 Netty 网络通信 服务端引导程序
Dispatcher
运行: createDispatcherIfRunning()
创建线程池,用于执行 WebMonitorEndpoint(内部启动了一个netty,注册了60几个Handle,来监听用户的提交) 所接收到的 client 发送过来的请求
dispatcher.runRecoveredJob(recoveredJob)
start方法完成,回调isleader()方法
实现循环发送心跳的效果\t\t 1、心跳时间:10s钟 2、心跳超时时间:50s钟
流程见:Flink源码——Job 提交、部署流程源码分析之构建ExecutionGraph
request.getRequestBody();
第三步、HA service 相关的实现(参考:补充主图2)
补充主图2:createHaServices()流程
clusterEntrypoint.startCluster();
父类中,异步初始化超级多一大堆 处理器(60××××××Handler)JobSubmitHandler 参照补充图3
super(configuration)
ResourceManager.onStart()
ResourceManager
handlers = super.initializeHandlers(localAddressFuture);
createHaServices()
ResourceManager 给 目标发送(TaskManager 或者 JobManager) 心跳
初始化 ResourceManagerRuntimeServices
config.sh
调用 runJob 运行一个任务
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
AkkaRpcService.startServer();
ZooKeeperJobGraphStore.putJobGraph();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>()
AkkaRpcServiceUtils\t\t\t\t.createRemoteRpcService()
获取 HA 模式下,存储状态数据的 HDFS 路径获取文件系统类型,一般是 HDFS,返回值类型是: HadoopFileSystem,它是一个 Hadoop FileSystem 的包装对象
* 注释: 启动 Dispatcher的 RPCServer 服务* 这里启动的是 Dispatcher的 Rpc 服务端。* 通过动态代理的形式构建了一个Server
开启心跳服务
WebMonitorEndpoint.startInternal();
* 注释: 返回 SessionDispatcherLeaderProcess\t\t * 返回值:SessionDispatcherLeaderProcess\t\t * 第三个参数的调用对象: jobGraphStoreFactory = HaServicesJobGraphStoreFactory\t\t * \t创建的结果是:ZooKeeperJobGraphStore\t\t * 第三个参数:jobGraphStoreFactory.create() = ZooKeeperJobGraphStore
startNewDispatcherLeaderProcess(leaderSessionID)
flink-daemon.sh
return highAvailabilityServices.getJobGraphStore();
恢复执行 待恢复的 Job
拿到请求体
readWorkers
RestServerEndpoint.start();
创建HA服务
生成资源配置
ENTRYPOINT=taskexecutor
该 函数 的定义在 config.sh 脚本中,Start TaskManager instance(s)
实现
初始化各种 Handler,包括: JobSubmitHandler
启动集群的entrypoint
启动 Netty 服务端
用来收听: TaskManager 的心跳
创建 ResourceManager 实例入参resourceManagerRuntimeServices
4
new ZooKeeperHaServices()
第六步: metrics(性能监控) 相关的服务
supervisor = startSupervisorActor()
* 注释:这个方法主要是做两件事情:\t * 1、initializeServices() 初始化相关服务\t * 2、dispatcherResourceManagerComponentFactory.create() 启动 Dispatcher 和 ResourceManager 服务。
启动 Dispatcher 服务
StandaloneDispatcher
注册关闭钩子
启动完成回到上层类的方法
JvmShutdownSafeguard.installAsShutdownHook(LOG);
SessionClusterEntrypoint
启动 JobManager 有可能有多个
循环执行心跳检测
DefaultDispatcherResourceManagerComponentFactory.create()
createResourceManagerRuntimeServices();
创建 DispatcherdispatcherFactory = SessionDispatcherFactory
创建 ResourceManagerRuntimeServices 实例
调用DispatcherRestEndpoint的方法
entrypointClusterConfiguration = commandLineParser.parse(args);
PluginUtils.createPluginManagerFromRootFolder(configuration);
开启第一个定时任务: checkTaskManagerTimeouts, 检查 TaskManager 的心跳
第四步: 初始化一个 BlobServer
webMonitorEndpoint = restEndpointFactory.createRestEndpoint()
createDispatcherIfRunning()
绑定主机名称和端口号
返回 ZooKeeperJobGraphStore 实例
handleRequest()
new StandaloneResourceManager()
构建 SessionDispatcherLeaderProcess 实例
\tBlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
启动 Flink 主节点: JobManager
SecurityContext securityContext = installSecurityContext(configuration);
start-cluster.sh
开启 Dispatcher 服务
返回Dispatcher
conf/workers配置
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
startInternal();
第五步,初始化一个心跳服务
checkTaskManagerTimeouts
ZooKeeperLeaderElectionService.start()
上传完成
PermanentlyFencedRpcEndpoint
创建一个 AkkaRpcService 实例返回, 在 AkkaRpcService 内部会初始化一个 SupervisorActor
jobmanager.sh
创建 WebMonitorEndpoint 实例(WebMonitorEndpoint内部启动了一个netty,注册了60几个Handle,来监听客户端以及UI端的提交)
ResourceManager.grantLeadership((UUID leaderSessionID)
* 1、启动 Netty 服务端\t\t\t * 2、选举\t\t\t * 3、启动定时任务 ExecutionGraphCacheCleanupTask
slotManager.start()
执行runCluster方法逻辑
startResourceManagerServices();
恢复得到 JobGragh * 由此可见: 服务端接收到客户端提交的,其实就是一个 JobGragh * 到此为止: 我们可以认为:客户端终于把 JobGragh 提交给 JobManager 了。 最终由 JobSubmitHandler 来执行处理
第七步: ArchivedExecutionGraphStore: 存储 execution graph 的服务
JobManager启动、创建完成
开启第二个定时任务: checkSlotRequestTimeouts, 检查 SlotRequest 超时处理
WebMonitorEndpoint
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
actorRef
Dispatcher.runJob(jobGraph);创建了JobMaster
registerSlotManagerMetrics()
ResourceManager
见:Flink源码——Job 提交、部署流程源码分析之构建ExecutionGraph
从 JobGraghFile 文件中,反序列化回来 JobGragh 对象
conf/masters配置
// 注释: 用于 Dispatcher leader 选举// 注释: dispatcherLeaderRetrievalService = ZooKeeperLeaderRetrievalService
registerShutDownFuture();
leaderElectionService.start(dispatcherRunner);
保存 JobGraph,jobGraphWriter = ZooKeeperJobGraphStore
\t\t\t * 注释: 创建 StandaloneResourceManager 实例对象\t\t\t * 1、resourceManager = StandaloneResourceManager\t\t\t * 2、resourceManagerFactory = StandaloneResourceManagerFactory
akkaRpcServiceBuilder.createAndStart();
this.rpcServer = rpcService.startServer(this);
ResourceManagerRuntimeServices\t\t\t.fromConfiguration(***)
注册一些信号处理
// TODO_MA 注释: 检索当前leader并进行通知一个倾听者的服务\t\tLeaderRetrievalService dispatcherLeaderRetrievalService = null;\t\t// TODO_MA 注释: 检索当前leader并进行通知一个倾听者的服务\t\tLeaderRetrievalService resourceManagerRetrievalService = null;\t\t// TODO_MA 注释: 服务于web前端Rest调用的Rest端点\t\tWebMonitorEndpoint<?> webMonitorEndpoint = null;\t\t// TODO_MA 注释: ResourceManager实现。资源管理器负责资源的分配和记帐\t\tResourceManager<?> resourceManager = null;\t\t// TODO_MA 注释: 封装Dispatcher如何执行的\t\tDispatcherRunner dispatcherRunner = null;
ZooKeeperLeaderElectionService.isLeader()
resourceManager 启动
MetricFetcher metricFetcher =××××××
WebMonitorEndpoint.grantLeadership((UUID leaderSessionID)
new DefaultDispatcherRunner()
SessionDispatcherLeaderProcess.createDispatcher()
Fink 的 选举,和 HBase 一样都是通过 ZooKeeper 的 API 框架 Curator 实现的 * 1、leaderLatch.start(); 事实上就是举行选举 * 2、当选举结束的时候: * 如果成功了: isLeader() * 如果失败了: notLeader()
leaderElectionService = ZooKeeperLeaderElectionService
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
* 注释: 创建 并启动 Dispatcher\t\t\t * 1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager\t\t\t * 2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory\t\t\t * 第一个参数: ZooKeeperLeaderElectionService\t\t\t * -\t\t\t * 老版本: 这个地方是直接创建一个 Dispatcher 对象然后调用 dispatcher.start() 来启动\t\t\t * 新版本: 直接创建一个 DispatcherRunner, 内部就是要创建和启动 Dispatcher
Flink 集群启动脚本分析(Standalone模式)Flink 集群的启动脚本在:flink-dist 子项目中,位于 flink-bin 下的 bin 目录:启动脚本为:start-cluster.sh该脚本会首先调用 config.sh 来获取 masters 和 workers,masters 的信息,是从 conf/masters 配置文件中获取的, workers 是从 conf/workers 配置文件中获取的。然后分别:1、通过 jobmanager.sh 来启动 JobManager 2、通过 taskmanager.sh 来启动 TaskManager他们的内部,都通过 flink-daemon.sh 脚本来启动 JVM 进程,分析 flink-daemon.sh 脚本发现:1、JobManager 的启动代号:standalonesession,实现类是: StandaloneSessionClusterEntrypoint2、TaskManager 的启动代号:taskexecutor,实现类是:TaskManagerRunnerHDFS集群一样的方式:1、start-all.sh 2、hadoop-daemon.sh start namenode/datanode/zkfc/journalnode 3、java org.apache.hadoop.server.namenode.NameNode
Flink源码(1.11.x) Flink 集群启动——启动脚本分析及主节点(JobManager)创建流程源码(Standalone模式)
结合到:Task部署、Task间数据交换、Task(线程)复用 runJob方法
启动选举
* 注释: 启动选举 * 参数:dispatcherRunner = DefaultDispatcherRunner * 调用对象:leaderElectionService = ZooKeeperLeaderElectionService * 这个选举服务对象 leaderElectionService 内部的 leaderContender 是 : DefaultDispatcherRunner
dispatcherLeaderProcessFactory.create(leaderSessionID);
checkSlotRequestTimeouts()
创建ResourceManager 的 Gateway
* 注释: 启动 ResourceManager 的 RPCServer 服务* 这里启动的是 ResourceManager 的 Rpc 服务端。* 接收 TaskManager启动好了之后,进行注册和心跳,来汇报 Taskmanagaer 的资源情况* 通过动态代理的形式构建了一个Server
第一步, 创建一个 Akka rpc 服务(参考:补充主图1)
recoveredJobs.clear();
Flink 集群的启动脚本在:flink-dist 子项目中,位于 flink-bin 下的 bin 目录:启动脚本为:start-cluster.sh
* 注释: 初始化服务,如 JobManager 的 Akka RPC 服务,HA 服务,心跳检查服务,metric service\t\t\t * 这些服务都是 Master 节点要使用到的一些服务\t\t\t * 1、commonRpcService: \t基于 Akka 的 RpcService 实现。RPC 服务启动 Akka 参与者来接收从 RpcGateway 调用 RPC\t\t\t * 2、haServices: \t\t\t提供对高可用性所需的所有服务的访问注册,分布式计数器和领导人选举\t\t\t * 3、blobServer: \t\t\t负责侦听传入的请求生成线程来处理这些请求。它还负责创建要存储的目录结构 blob 或临时缓存它们\t\t\t * 4、heartbeatServices: \t提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送者。\t\t\t * 5、metricRegistry: \t跟踪所有已注册的 Metric,它作为连接 MetricGroup 和 MetricReporter\t\t\t * 6、archivedExecutionGraphStore: \t存储执行图ExecutionGraph的可序列化形式。
加载配置信息
接收到客户端的请求(SubmitJob)
ClusterEntrypoint.initializeServices()
调用SessionDispatcherLeaderProcess的方法
leaderContender.grantLeadership(issuedLeaderSessionID);
ObjectInputStream objectIn = new ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))jobGraph = (JobGraph) objectIn.readObject();return jobGraph;
ZOOKEEPER 获取 BlobStoreService
getPathAndAssertUpload()
for(HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);}
1、初始化 ActorSystem2、启动 Actor
* 注释: 第四步: 初始化一个 BlobServer\t\t\t * 主要管理一些大文件的上传等,比如用户作业的 jar 包、TM 上传 log 文件等\t\t\t * Blob 是指二进制大对象也就是英文 Binary Large Object 的缩写
DefaultDispatcherRunner.grantLeadership((UUID leaderSessionID)
startExecutionGraphCacheCleanupTask();
在 Standalone 模式下,什么也没做,空实现
当这里执行完毕的时候,需要执行他的 onStart() 方法
JobManagerRunnerImpl.grantLeadership((UUID leaderSessionID)
* 注释: 创建 WebMonitorEndpoint 实例, 在 Standalone模式下:DispatcherRestEndpoint * 1、restEndpointFactory = SessionRestEndpointFactory * 2、webMonitorEndpoint = DispatcherRestEndpoint * 3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService
执行runCluster方法逻辑集群关闭时的回调
实例化ResourceManager(继承RPC框架RpcEndpoint规范 )后,回调ResourceManager的onStart()
commonRpcService()
ENTRYPOINT=standalonesession
SessionDispatcherLeaderProcess.create();
安装安全关闭的钩子
开启服务: 启动 JobGraghStore, 一个用来存储 JobGragh 的存储组件
bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);
taskManagerHeartbeatManager = heartbeatServices\t\t\t.createHeartbeatManagerSender()
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
0 条评论
下一页