Mapreduce MRAppMaster、MapTask、ReduceTask 执行流程图
2021-08-19 15:43:17 0 举报
Mapreduce MRAppMaster、MapTask、ReduceTask 执行流程图
作者其他创作
大纲/内容
Task
assignMapsWithLocality(allocatedContainers)
handle(JobSetupCompletedEvent)
SetupCompletedTransition
RequestContainerTransition
ScheduledRequests
Req > Max Alloc
handle(TaskAttemptContainerAssignedEvent)
createSplits()
handleEvent(Container_Req)
创建 launchContext
Read阶段
heartbeat()
JOB_START
serviceInit
initAndStartAppMaster
maps&reduces=0
committer.setupJob() 创建临时输出目录
handle(Speculator#ATTEMPT_START)
CONTAINER_REMOTE_LAUNCH
将request 添加 到remoteRequestsTable&ask
初始化 Job
TaskAttempt
过滤并释放不满足分配条件的container
InitFailedTransition
ContainerLauncher
handle(TASK_CONTAINER_NEED_UPDATE)
NodeManager
RMContainerAllocator
ClientService
Thread - AllocatorRunnable
main
lostTaskCheckerThread
assignContainers(allocatedContainers)
scheduleTasks(reduces)
StartTransition
限制 map/reduce 的运行个数mapreduce.job.running.map.limitmapreduce.job.running.reduce.limit
Merge 阶段
setup()
从资源请求列表中,删除当前即将被分配的资源请求
MRAppMaster
getResources()
CommitterEventHandlerEventProcessor
assignWithoutLocality(allocated)
handle(T_ATTEMPT_LAUNCHED)
更新推测执行
设置 startTime
创建 Job createJob()
handle(CONTAINER_REQ)
handle (CommitterJobSetupEvent)
create map/reduce num
TA_SCHEDULE
T_SCHEDULE
CONTAINER_REQ
c.launch(launchEvent)
定时检查是否有taks Timeout
JOB_INIT
set map/reduce num
handle(JOB_COMPLETED)
handle(CONTAINER_REMOTE_LAUNCH)
TA_CONTAINER_LAUNCHED
Job.kill
TaskAttemptListener
TA_ASSIGNED
读切片信息
ContainerAssignedTransition
决定作业运行模式是Uber模式
Map
溢写阶段
Collect 阶段
addMap(ContainerRequestEvent)
decContainerReq(assigned)
makeUberDecision()
taskAttemptListener.registerLaunchedTask()
addContainerReq(request)
EventProcessor
ContainerAllocator
Vocabulary Used: pending -- 不能发送到 RM 的请求 scheduled -- 能发送给RM和以发送的请求 assigned -- 已分配给容器的请求 completed -- 容器已完成的请求Map:scheduled->assigned->completedReduce:pending->scheduled->assigned->completed
创建 MapTask/ReduceTask
Job
makeRemoteRequest()
ContainerLauncher
committerEventHandler
Thread eventHandlingThread
为 task 申请container
ScheduledRequests.addMap(req)
LaunchedContainerTransition
Reduce
assign(allocatedContainers)
handle(TA_CONTAINER_LAUNCHED)
setupProgress = 1.0f
taskAttempt.createRemoteTask()
添加到 attempts
scheduleTasks(maps)
JOB_SETUP_COMPLETED
handleJobSetup(event)
ContainerRequest
attemptID TaskAttemptId capability Resource hosts String[] racks String[]
OutputCommitter
InitialScheduleTransition
创建 TaskAttempt
TaskHeartbeatHandler
Scheduler
Map 阶段
containerMgrProxy.startContainers(launchEvent)
pendingReduces.add(req)
startJobs()
serviceStart
applyConcurrentTaskLimits()
设置job:remoteJobSubmitDirremoteJobConfFile
0 条评论
回复 删除
下一页