Flink 集群启动——从节点(TaskManager)创建流程源码(Standalone模式)
2022-04-08 17:16:55 22 举报
Flink 集群启动——从节点(TaskManager)创建流程源码(Standalone模式)
作者其他创作
大纲/内容
N处理故障
final JobLeaderService jobLeaderService = new DefaultJobLeaderService()
slot.getState()
TaskManagerRunner.main
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
启动入口 * ResourceID: Flink集群: 主节点 从节点, 每个节点都有一个全局唯一的ID
notifyLeaderAddress()
完成注册
执行注册
调用注册
初始化进行回调处理的 线程池
4
\tthis.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
初始化一个 AkkaRpcServiceBuilder
完成 Slot 注册循环 (initialSlotReport封装了taskManagerServicesConfiguration.getNumberOfSlots()个slot)for(SlotStatus slotStatus : initialSlotReport)
获取 NettyShuffleEnvironmentConfiguration 对象
Supervisor 是一个 Actor\t * 最终总结是这样的: 我们启动一个 commonRpcServices 内部启动一个 ActorSystem, 这个 ActorySystem 启动一个 ActorSystem
开始注册
startRegistrationTimeout();
supervisor = startSupervisorActor();
KvStateServerImpl.start();
返回: FileChannelManagerImpl
创建一个 AkkaRpcService 实例返回, 在 AkkaRpcService 内部会初始化一个 SupervisorActor
port = (Integer) actorSystemAddress.port().get();
* 注释: 与 ResourceManager 建立连接,然后添加监听:ResourceManagerLeaderListener 监听 RM 的变更 * 启动 ResourceManagerLeaderListener, 监听 ResourceManager 的变更 * TaskManger 向 ResourceManager 注册是通过 ResourceManagerLeaderListener 来完成的, * 它会监控 ResourceManager 的 leader 变化, 如果有新的 leader 被选举出来, * 将会调用 notifyLeaderAddress() 方法去触发与 ResourceManager 的重连 * - * 1、ZooKeeperLeaderRetrievalService = resourceManagerLeaderRetriever
9
TaskExecutor.heartbeatFromResourceManager
初始化 FileCache
重链
return new DefaultJobTable();
reportHeartbeat(heartbeatOrigin);
* 注释: 调度 TaskManager 的下线\t\t\t * heartbeatListener = ResourceManagerHeartbeatListener\t\t\t * 如果代码真正的能执行到这儿,就证明连续 5 次 ResourceManager 发送过来的心跳请求\t\t\t * 当前这个 TaskExecutor 都没有收到了, 但是正常情况下, ResourceManager 肯定是存活的呀\t\t\t * 所以: 发起跟 ResourceManager 的重新链接
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
初始化 RpcService
N 重试注册
2
TaskExecutor 接收到 ResourceManager 的心跳请求
1
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);taskManagerServicesConfiguration=*******
AkkaRpcServiceUtils.createRemoteRpcService()
这个类是 TaskManager执行器的实现, 主要负责执行多个 TaskManager当前构造方法执行完了之后,执行 onStart() 方法,因为 TaskExecutor 是一个 RpcEndpoint
返回newRegistration
构建 MemoryManager 实例
5
Configuration configuration = loadConfiguration(args);
heartbeatMonitor.reportHeartbeat();
PermanentBlobCleanupTask.run()
Y更新Slot的状态\t\t\t * 1、slot已经被分配了\t\t\t * 2、没有匹配的 PendingTaskManagerSlot(slot申请请求)
添加监听器
初始化 MemoryManager 实例对象
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
return akkaRpcServiceBuilder.createAndStart();
检查资源是否配置,如果没有默认值,则必须配置
状态管理服务
server启动
创建注册对象, 并且在创建成功之后,进行 Slot 汇报
Files.delete(localFile.toPath());
执行心跳汇报
如果上面的依然失败,继续尝试注册
初始化 BroadCastVariableManager = BroadcastVariableManager
完成启动
初始化 TaskExecutorLocalStateStoresManager
遍历生成 SlotStatus 对象
return ShuffleServiceLoader.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())\t\t\t.createShuffleEnvironment(shuffleEnvironmentContext);
异步建立和 ResourceManager 的链接,然后进行 slot 汇报
nodeChanged()
没slot申请
向 ResourceManager 注册 TaskExecutor
更新 job 在该节点上申请的 Slot 数量
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
关于FlinK 的主从节点的心跳:\t * 1、首先启动 RM, 启动 HeartbeatManager, 每10s中,针对哪个注册的 TaskExecutor(遍历Map)\t * 执行 发送心跳请求\t * 2、再启动 TaskExecutor, 首先启动 超时检查任务(5min)\t * 启动好了之后,会进行注册,接收到心跳请求请求之后,也就相当于 RM 和 TE 之间就维持了正常的心跳。\t * 3、TaskExecutor 每次接收到 RM 的心跳请求之后,就重置自己的超时任务
taskManagerRegistration.occupySlot();
只要是 TaskExecutor 注册上线了,必然会进行 slot汇报,\t * 汇报的时候,所有的 slot 都执行注册,其实就是被管理在这个 slots 集合中
返回 TaskManager 汇报成功消息
if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID()))
启动一定数量的(按目录个数) ReaderThread
入参
taskManagerRunner.start();
初始化插件
reconnectToResourceManager(\t\t\t\t\tnew TaskManagerException(String.format(\"The heartbeat of ResourceManager with id %s timed out.\
* 简单来说: ResourceManager 告诉 TaskExecutor 说,你应该把 slotid 的 Slot 分配给 JobID 的 Job 去使用\t\t\t * 先在 TaskExecutor 上,自己先登记,该 Slot 已经被使用
通知 RM 的变更leaderListener = ResourceManagerLeaderLisnterin TaskExecutor
closeResourceManagerConnection(cause);
connectToResourceManager();
new ShuffleEnvironmentContext()
Flink 集群监控
final SlotReport slotReport = new SlotReport(slotStatuses);
启动一定数量的 WriterThread 和 ReaderThread
ZooKeeperLeaderRetrievalService
启动监听
* 注释: 初始化 BlobCacheService\t\t * 其实内部就是启动两个定时任务,用来定时执行检查,删除过期的 Job 的资源文件。\t\t * 通过 引用计数(RefCount) 的方式,来判断是否文件过期(JVM)\t\t * 1、主节点其实启动了一个 BlobServer\t\t * 2、从节点:BlobCacheService\t\t * 其实有两种具体的实现支撑:\t\t * 1、永久的\t\t * 2、临时的
HardwareDescription 硬件抽象对象
return connectionManager.start();
保存: 哪个 Slot 被分派给了哪个 TaskSlot
state = State.ALLOCATED;
this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
完成 TaskManager 注册
this.writers = new WriterThread[tempDirs.length];
timerService.start(this);
生成一个 SotReport 报告
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);address = actorSystemAddress.host().get();
分配 Slot
N 有关联的 PedningSlotRequest,则这个 request 可以被满足,分配 slot
TransientBlobCleanupTask.run()
创建 TaskExecutor 实例\t\t * 内部会创建两个重要的心跳管理器:\t\t * 1、JobManagerHeartbeatManager\t\t * 2、ResourceManagerHeartbeatManager\t\t * -\t\t * 这里才是 初始化 TaskManagerRunner 最重要的地方!
resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
this.readers = new ReaderThread[tempDirs.length];
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
创建 RM 的地址信息
启动 TimerService(超时检测服务)
* 注释: 记住这种代码结构:\t\t\t * 1、ResourceManagerLeaderListener 是 LeaderRetrievalListener 的子类\t\t\t * 2、NodeCacheListener 是 curator 提供的监听器,当指定的 zookeeper znode 节点数据发生改变,则会接收到通知\t\t\t * 回调 nodeChanged() 方法\t\t\t * 3、在 nodeChanged() 会调用对应的 LeaderRetrievalListener 的 notifyLeaderAddress() 方法\t\t\t * 4、resourceManagerLeaderRetriever 的实现类是: LeaderRetrievalService的实现类:ZooKeeperLeaderRetrievalService,\t\t\t * 它是 NodeCacheListener 的子类\t\t\t * 5、resourceManagerLeaderRetriever 进行监听,当发生变更的时候,就会回调:ResourceManagerLeaderListener 的 notifyLeaderAddress 方法
链接 ResourceManager
extends
N这个 slot 已经被分配了
final JobTable jobTable = DefaultJobTable.create();
if(acknowledge != null) {
3
final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);
初始化 actorSystem
如果还有空闲资源则封装一个 TaskSlot 对象
* 启动 TaskManager 的 RPCServer 服务* 接收 TaskManager启动好了而之后, 进行注册和心跳,来汇报 Taskmanagaer 的资源情况* 通过动态代理的形式构建了一个Server
删除永久文件组件
final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
10
ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();
heartbeatListener.notifyHeartbeatTimeout(resourceID);
获取 TaskExecutor 的 ResourceID
Y注册次数累加1,继续注册
实例
构建 ResultPartitionFactory
taskManager = startTaskManager(****)
internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
启动 NettyServer
主机名称
numberFreeSlots--;
7
启动一定数量的 WriterThread
kvStateService.start();
初始化两个组件\t\t * 1、PermanentBlobCache\t\t * 2、TransientBlobCache
重新调度 50s 后就执行这个延迟调度任务,执行 TaskManager 的下线操作
埋下钩子 方便关闭
onTaskManagerRegistration(workerTypeWorkerRegistration);
真正完成注册维持心跳 = monitorTarget
初始化 TaskEventDispatcher * Flink 运行的是 流式任务: StreamTask (OperatorChain Pipline 输入给我,调用 Transformation执行,输出结果)* TaskEventDispatcher 负责 从 消费Task 发送 Task消费结果 给 上游生产Task * 任务事件分派器分派从消耗任务向产生消耗结果的任务倒流的事件。* 向后事件仅适用于产生流水线结果的任务,也就是说生产Task和消费Task同时运行的时候
this.handler = initializeHandler();
if(pendingTaskManagerSlot == null)
申请成功
启动
创建线程池,执行查询处理
启动 SupervisorActor
返回: NettyConnectionManager
返回NettyShuffleEnvironment
启动 TaskSlotTable
初始化 JobTable = DefaultJobTable
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
注释: 向 TaskExecutor 发起 RPC 请求申请 Slot\t\t * 1、gateway: 代表 TaskExecutor\t\t * 2、slotId: 代表的是 TaskExecutor 中的某一个 Slot\t\t * 3、pendingSlotRequest.getJobId() 对应的 JobMaster
HeartbeatManagerImpl jobManagerHeartbeatManager
Y这个 slot 还没有被分配,则找到和当前 slot 的计算资源相匹配的 PendingTaskManagerSlot
NettyConfig nettyConfig = config.nettyConfig();
final int listeningDataPort = shuffleEnvironment.start();
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
构建 Shuffle 上下文对象
11
haServices 基于 ZK 的实现的
Y
this.server = new NettyServer(nettyConfig);
开启服务
Y集群其启动不走这里报告 Slot 的状态
heartbeatListener.retrievePayload(requestOrigin)
if(failure != null && !isCanceled())
提供一个 Slot 给 JobManager(JobMaster)
节点信息发生变化是回调说明
onRegistrationSuccess(result.f1);
NPendingTaskManagerSlot(slot申请请求) 可能有关联的 PedningSlotRequest
6
通过一个线程来启动 TaskManager
初始化 ioExecutor
* {@link ShuffleEnvironment} 的实现基于 Netty 网络通信,本地内存和磁盘文件。 * 网络环境包含跟踪所有中间结果的数据结构,并进行随机数据交换。* 对应一个 SingleInputGate, 产出一个 ResultPartition
new ResultPartitionManager()
Slot 汇报
createShuffleEnvironment(shuffleEnvironmentContext)
如果没资源
this.bootstrap = new ServerBootstrap().bind().sync();
重新申请slot
extendsRpcEndpoint构建了一个Server
加载配置参数
if(assignedPendingSlotRequest == null)
ALLOCATED
8
超时重连跳转
链接 ResourceManager
构建 SingleInputGateFactory
registration successful!
attemptToBind(portIterator.next())
* taskManagerServices = TaskManagerServices\t\t * 里头初始化了很多很多的 TaskManager 在运行过程中需要的服务\t\t * 在这儿 TaskManager 启动之前,已经初始化了一些服务组件,基础服务,\t\t * 这里面创建的服务,就是 TaskManager 在运行过程中,真正需要的用来对外提供服务的 各种服务组件
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
启动TaskManager
12
如果超时需重连
new NettyShuffleEnvironmentConfiguration()
启动并调度一个定时服务删除该 Job 的工作目录
通过反射拿到 NettyShuffleServiceFactory 对象
解析 main方法参数 和 flink-conf.yaml 配置信息
监听 Leader 的变更, 如果变更发生,则对应的监听器,收到通知
当 RM 变更的时候,执行 RM 的重连
this.rpcServer = rpcService.startServer(this);
synchronized(lock) {\t\t\tleaderListener = listener;\t\t\tclient.getUnhandledErrorListenable().addListener(this);\t\t\tcache.getListenable().addListener(this);\t\t\tcache.start();\t\t\tclient.getConnectionStateListenable().addListener(connectionStateListener);\t\t\trunning = true;\t\t}
checkTaskExecutorResourceConfigSet(config);
启动一个定时任务清楚TM上文件
Slot 汇报, 发送给 ResourceManager
PENDING
获取到newWorker后构造一个 WorkerRegistrationhardwareDescription是硬件抽象
实现
createSlotReport中根据taskManagerServicesConfiguration.getNumberOfSlots()的个数来创建对应的SlotID放在slotStatuses里面最后生成一个 SotReport 报告
注册 Slot
heartbeatTarget.receiveHeartbeat(getOwnResourceID(),TaskExecutorHeartbeatPayload)
发送心跳
final ExecutorService terminationFutureExecutor = Executors\t\t\t.newSingleThreadExecutor(new ExecutorThreadFactory(\"AkkaRpcService-Supervisor-Termination-Future-Executor\"));
初始化 JobLeaderService
FileUtils.deleteDirectory(localFile);
handleFreeSlot(slot);
* 注释: taskSlots的结构 * 0 =====> TaskSlot * 1 =====> * 2 =====> * 3 =====> TaskSlot * 4 =====> * 5 =====> TaskSlot
启动 Job, 并不是一个真正的 Job,而是一个代表是否有链接存在
注册
if(allocationId == null)
构建 TaskManager 实例
client启动
创建: NettyShuffleEnvironment
ZooKeeperLeaderRetrievalService.nodeChanged()
重试注册
PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());
if(null != allocationId)
取消之前的 延迟调度任务
* 注释: 注册\t\t * 从现在开始计时,如果倒计时 5min 中,到了,还没有注册成功,则意味着注册超时\t\t * 延时调度: 5min 调度执行
* 重要的四件事情: * 1、监控 ResourceManager * (a)、链接 ResourceManager * (b)、注册 * (c)、维持心跳 * (d)、当前 TaskExecutor 也会监控 RM 的变更 * 2、启动 TaskSlotTable 服务 * 3、监控 JobMaster * 4、启动 FileCache 服务
* 主节点主动!\t\t\t * 1、taskExecutorResourceId\t\t\t * 2、HeartbeatTarget\t\t\t * 针对每一个 TaskEXecutor 都有一个唯一的 HeartbeatTarget 对象!
创建TaskManagerServices
tryConnectToResourceManager();
监控networkBufferPool
初始化 StateRootDirectory 和 StateRootDirectoryFile
this.queryExecutor = createQueryExecutor();
创建一个 NettyShuffleEnvironment
* 注释: 完成注册\t\t\t * 延迟 五分钟,执行 registrationTimeout() 检查是否超时
if slots.containsKey(slotId)remove the old slot first
Flink源码——Job 提交、部署流程源码分析之 Slot 管理(申请和释放)流程中公共代码
获取这个 jobID 对应的 job 的临时 blob 文件对TaskManager 上的旧数据的删除
return new TaskExecutor()
TaskExecutor 注册成功之后,会进行 Slot 汇报
reconnectToResourceManager(\t\t\tnew FlinkException(String.format(\"ResourceManager leader changed to new address %s\
newRegistration.startRegistration();
从 SlotManager 中移除旧的 TaskManager
bindFuture = bootstrap.bind().syncUninterruptibly();
初始化文件系统
判断是否成功
启动入口
Y没有关联的 PedningSlotRequest,则将 slot 是 Free 状态
生成一个: ActorSystemScheduledExecutorAdapter
初始化一个 NettyServer
启动 Netty Server 端引导程序处理handler
链接新的 ResourceManager
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
创建 Handler = KvStateServerHandler
如果有资源null != pendingSlotRequest
TaskManager 是 Flink 的 worker 节点,它负责 Flink 中本机 slot 资源的管理以及具体 task 的执行。TaskManager 上的基本资源单位是 slot,一个作业的 task 最终会部署在一个 TM 的 slot 上运行,TM会负责维护本地的 slot 资源列表,并来与 Flink Master 和 JobManager 通信。根据以上的脚本启动分析:TaskManager 的启动主类: TaskManagerRunner
if(result instanceof RegistrationResponse.Success)
间隔调度
端口号
TaskExecutor.new ResourceManagerLeaderListener()
createNettyShuffleEnvironment()
启动过程中,启动了 Netty 服务端 和 客户端,负责 IO
this.client = new NettyClient(nettyConfig);
startTaskExecutorServices();
ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(***)
触发回调
Y可以分配到资源更新状态,完成申请allocationId和JobID关联
ZooKeeperLeaderRetrievalService.start()
绑定主机名称和端口号传入参数akkaRpcServiceBuilder
rpcServer.start();
初始化一个 NettyClient
if(heartbeatTarget != null)
onStart()
生成 TaskSlotTableImpl
完成注册\t\t\t * 延迟 五分钟,执行 registrationTimeout() 检查是否超时
pendingTaskManagerSlot = null;
移除旧的注册对象
bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);
关闭和原有的 ResourceManager 的链接
Flink源码(1.11.x) Flink 集群启动——从节点(TaskManager)创建流程源码(Standalone模式)
offerSlotsToJobManager(jobId);
JobLeaderService 用于监听 Master。如果 Master 节点改变,会通知 JobLeaderService,内部是以 jobId 为 key 保存 LeaderRetrievalService 和 JobManagerLeaderListener
通知 RM 的变更leaderListener = ResourceManagerLeaderLisnter
调用注册成功的监听
注册
返回: NetworkBufferPool
shuffleEnvironment = NettyShuffleEnvironment* 上游 StreamTask 和 下游 StreamTask 有 shuffle 动作。* 在这个动作过程中,肯定需要很多的一些组件来为其服务, 现在创建的这个 NettyShuffleEnvironment* 对象为将来的 shuffle 提供各种组件创建的支撑
resourceManagerLeaderRetriever.start()
创建和启动 AkkaRpcService
创建TaskManagerConfiguration
HeartbeatManagerImpl resourceManagerHeartbeatManager
更新 Slot 状态
删除临时文件组件
返回 TaskExecutor 注册成功消息
启动网络链接管理器
里面 初始化两个 Map
HeartbeatMonitorImpl.run
* 注释: 当监听响应的时候,就会自动调用 这个方法,等同于 zk 中的 watcher 中的 process 方法\t * 这是属于 curator 的知识\t * 如果 ResourceManager 发生变更过, 或者你是第一次启动
AbstractServerBase.start();
actorSystem = BootstrapTools\t\t\t\t\t.startLocalActorSystem()
重设心跳超时相关的 时间 和 延迟调度任务
checkTaskExecutorNetworkConfigSet(config);
* 注释: 初始化 HeartbeatServices * 1、TaskExecutor 这个组件是一定会启动的 * 2、JobMaster JobLeader Flink Job的主控程序,类似于Spark的Driver * 这两个组件,都需要个 ResourceManager 维持心跳 * 将来不管有多少个需要发送心跳的组件,都可以通过 heartbeatServices 来创建对应的心跳组件 */
* 注释: 不管是那种集群的注册: * 1、主节点启动注册服务 * 2、从节点启动 * 3、先封装自己的信息,通过 RPC 发送给 主节点 * 4、主节点处理注册,其实就是给这个从节点,分配一个 全局唯一的 ID (从节点启动的时候就已经生成好了), * 再通过 ID --> Registrition 注册对象的 这种映射关系保持在 主节点的内存当中。 * 5、如果是HBase, 除了 regionserve 上线之后,想 hmaster 注册以外, 还会把自己的信息,写到 ZK 里面
cancelTimeout()
创建 MemoryManagermemoryPageSize默认 32KB
向 TaskExecutor 发起 RPC 请求申请 Slot
创建注册对象, 并且在创建成功之后,进行 Slot 汇报 * 返回的结果是: * 1、TaskExecutor注册: TaskExecutorToResourceManagerConnection.ResourceManagerRegistration * 2、JobManager(JobMaster)注册: DefaultJobLeaderService.JobManagerRetryingRegistration * - * TaskEXecutorRegistrition ---> RetryingRegistration(ResourceManagerRegistration) * - * 重点: * 1、创建 注册对象: TaskExecutorToResourceManagerConnection.ResourceManagerRegistration * 2、进行 Slot 汇报: onRegistrationSuccess(result.f1);
TaskExecutorToResourceManagerConnection.onRegistrationSuccess(result.f1);
TaskExecutor 和 ResourceManager 之间的链接对象
解析配置文件,获取到资源配置
0 条评论
下一页