Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(一)
2022-04-08 16:38:09 15 举报
Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(一)
作者其他创作
大纲/内容
resumeInternal()
判断是否有ck的数据
subpartition.flush();
partitionWriter.close();
output.collect(record[0]); splitState.set(record[1] + 1);
入参
调用用户自定义的算子处理
创建 Channel 的 IO 线程池
//当时在构建 ExecutorGraph 的时候,会帮我们把每一个 ExecutorVertex 的启动类都会初始化好,设置在设置在 ExecutorVertex 里面//Slot ===> Task ===> ExecutorVertex ===> 启动类
if(!jobId.equals(jobInformation.getJobId()))
StreamOneInputProcessor.processInput
if(status == InputStatus.END_OF_INPUT)
循环创建InputChannel
在生成一个 InputChannel 数组
notEmpty.signal();
ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];
获取 ExecutorVertex 的 Operator初始化实例
\tTaskSlot<T> taskSlot = getTaskSlot(allocationId);
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
if(mailbox.getState() == TaskMailbox.State.OPEN)保持状态检查和有异常邮件队列原子
markFailed(failure);
subpartitions[subpartitionIndex].flush();
mailboxProcessor.allActionsCompleted();
当前节点是否已经有 Slot 申请到了
FsStateBackend
序列化原來数据(record)
((RemoteInputChannel) inputChannel).assignExclusiveSegments();
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
N启动的时候走这里
return false;
\tdispatcher.dispatch(deque.take());
执行function的run方法
初始化 SubtaskCheckpointCoordinatorImpl
if(enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {\t\t\t\treturn;\t\t\t}
\tavailabilityListener.notifyDataAvailable();
if(suspendedDefaultAction == this)
taskSlot != null
LocalInputChannel
mailboxProcessor.reportThrowable(sourceThreadThrowable);
return operator.emitNext(output);
closeNetworkResources();
RecordWriterOutput.collect()
RecoveredInputChannel
final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
if (wrapped instanceof BoundedOneInput)
for(ResultSubpartition subpartition : subpartitions)
注册监控
N
for(int gateIndex = 0; gateIndex < inputGates.length; gateIndex++)
\tfinal TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
if(throwable != null)
toNotify.complete(null);
创建 BufferPool
* 提交 Task\t\t\t * taskManagerGateway = RPCTaskManagerGateway\t\t\t * Task -- ExecutionVertex(Execution) --- Slot ---- TaskManager\t\t\t * 真正实现了 从 JobMaster 提交 Task 到 TaskManager(TaskExecutor) 执行!
SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
executionVertexOperations = DefaultExecutionVertexOperations
slotAssigned.handle(deployOrHandleError(deploymentHandle))
启动的时候每个线程状态检查
第三个参数: executor = ChannelStateWriteRequestExecutorImpl
if (bufferOrEvent.isBuffer())
buffers.add(bufferConsumer);
\tserializer.prune();
buffer 写满了,调用 bufferBuilder.finish 方法
for(DeploymentHandle deploymentHandle : deploymentHandles)
for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {\t\t\t\ttaskEventDispatcher.registerPartition(partitionWriter.getPartitionId());\t\t\t}
if(mailbox.isMailboxThread())
LegacySourceFunctionThread.run
recordWrites.size() == 1
构建一个 TaskKvStateRegistry 实例
加入待返回 InputGate List
StreamTaskNetworkInput
PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
targetPartition.flush(targetChannel);
if (spanningWrapper.getNumGatheredBytes() > 0)
task.startTaskThread();
\t\tsuper(env);
对上述生成的 ResultPartition 再根据是否需要发回反馈信息等,进行进一步对象的处理
如果 buffers 数量大于1,证明,之前已经执行了 notifyDataAvailable()
assignResourceOrHandleError(deploymentHandle)
通过 deployOrHandleError 来进行部署
一堆:DeploymentHandle
检验 buffer 是否写满
Y cas更新状态
operator.processElement(record);
时间语义服务 初始化
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
ChannelSelectorRecordWriter
获取 ExecutionVertexID
注册输入分区
* 提交到对应的 slot 所在节点的 TaskExecutor 中来执行该 ExecutionVertex,其实已经变成: Task * 关于 Client 提交 Job 到最后变成分布式 Task 物理执行图的所有细节到此为止,结束了。 * 从这以后,就是去到了 TaskManager 中的 TaskExecutor 中来执行 Task 了
initialCredit = bufferManager.requestExclusiveBuffers();
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()\t\t\t.setChannelSelector(outputPartitioner).setTimeout(bufferTimeout).setTaskName(taskName)\t\t\t// TODO_MA 注释: 构建一个 RecordWriter 返回\t\t\t.build(bufferWriter);
释放 Task 运行需要的一些文件资源
IngestionTime
通知数据可用
创建 BufferPoolFactory
waitForAllSlotsAndDeploy(deploymentHandles);
RocksDBStateBackend
初始化一个 ArrayList 容器用来存放创建出来的 RecordWriter
MemoryStateBackend
exactly once
bufferManager.requestExclusiveBuffers();
notifyFinalState();
为运行算子的线程设置类加载器
当构造 Task 具体启动实例候
获取 Operator 的执行上下文对象 根据不同的语义获取不同的 SourceContext
* 一般情况下,我们在生产环境中,会去进行配置,在 flink-conf.yaml 文件中进行配置: * 1、state.backend: filesystem * 2、state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints * 一般有三种方式: * 1、state.backend: filesystem = FsStateBackend * 2、state.backend: jobmanager = MemoryStateBackend * 3、state.backend: rocksdb = RocksDBStateBackend * 也可以在程序中,进行设置: * StreamExecutionEnvironment.setStateBackend(StateBackend backend) 这种方式会覆盖配置文件中的配置
释放网络资源
userFunction.processElement(record);
while1、由 CREATED 改成:DEPLOYING, 然后退出循环继续部署2、FAILED和CANCELING、异常状态返回
N如果也没有配置,则使用默认的: MemoryStateBackend
flush数据
collect(element);
executionVertexOperations.deploy(executionVertex);
consumableNotifyingPartitionWriters[counter] = partitionWriters[counter];
* 注释: 获取到代码运行主类 * nameOfInvokableClass 是 JobVertex 的 invokableClassName, AbstractInvokable = invokable * 每一个 StreamNode 在添加的时候都会有一个 jobVertexClass 属性 * 对于一个 operator chain,就是 head operator 对应的 invokableClassName,见 StreamingJobGraphGenerator.createChain * 通过反射创建 AbstractInvokable 对象 * 对于 Stream 任务而言,就是 StreamTask 的子类,SourceStreamTask、OneInputStreamTask、TwoInputStreamTask 等
PipelinedSubpartition.add
分配 buffer
* 1、申请到了 slot* 2、构件好了 Handler* 3、执行部署
serializationBuffer.pruneBuffer();\t\tdataBuffer = serializationBuffer.wrapAsByteBuffer();
if(taskSlot.add(task)
Ysignal check的消息加入 mail 在最前面
* 1、如果上游 StreamNode 和 下游 StreamNode 的并行度一样,则使用: ForwardPartitioner 数据分发策略* 2、如果上游 StreamNode 和 下游 StreamNode 的并行度不一样,则使用: RebalancePartitioner 数据分发策略
recordWrites.size() =其他的情況
返回是否注册成功
return inputGate;
buffers是RecoedWrite寫到Subpartition的是数据
availableChannels = inputChannelsWithData.size();
\tfor(int i = 0; i < outEdgesInOrder.size(); i++)
exclusiveBuffers.add(buffer);
刷新到对应 inputChannel 的 ResultSubPartition 中
for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {\t\t\t\tif(partitionWriter != null) {\t\t\t\t\tpartitionWriter.finish();\t\t\t\t}\t\t\t}
notifyChannelNonEmpty();
ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
一个 Task 执行一个 ExecutionVertex
ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
调用 assignResourceOrHandleError 来获取 申请到的 slot, 有可能获取不到
告诉 MailBox 先暂停 loop
将序列化器中的序列化结果写入目标 channel
StreamTask
InputChannel[] inputChannels = new InputChannel[shuffleDescriptors.length];
if(inputChannelDescriptor.isLocalTo(taskExecutorResourceId))
Y
for(ResultPartitionDeploymentDescriptor desc : descs)
初始化inputGates 数组
执行任务
在Slot上注册 Task
timerService.unregisterTimeout(taskSlot.getAllocationId());
executor.start();
jobTable.getConnection(jobId)
if(taskSlot.markActive())
ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking() ?
deployTaskSafe(executionVertexId);
MailboxDefaultAction.Suspension.resume
\tfor(IntermediateResultPartition partition : partitions)
currentExecution.deploy();
* 注释: 调用 deployAll() 部署任务 * 1、assignAllResources(deploymentHandles) 分配 slot * 2、deployAll(deploymentHandles) 执行任务部署 * 之前只是申请!分配有可能成功,也有可能失败 * - * 把 deploymentHandles 中的每一个元素 和 slotExecutionVertexAssignments 中的每个元素做一一对应
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
根据配置获取 StateBackend
taskAdded = taskSlotTable.addTask(task);
output.collect(record);
state = TaskSlotState.ACTIVE;
int maxParallelism = getPartitionMaxParallelism(partition);
OneInputStreamTask
构造一个 PartitionDescriptor 对象,主要用于创建 ResultPartition
pushToRecordWriter(record);
final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader());
* 注释: channelSelector确定目标channel\t\t * channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一样的作用:\t\t * 用来决定 record 到底被分发到那个一个分区\t\t * channelSelector.selectChannel(record) = partitioner.getPartition()
注册当前 Task 的 ResultPartition 到启动当前 Task 的 TaskManager 之上的用来跟踪管理 ResultPartition 的 ResultPartitionManager 之中
if (isPriorityEvent)
获取最大并行度
if(taskAdded)
if (copyFromSerializerToTargetChannel(targetChannel))
部署 Task(到时候根据 ExecutionVertexID 来确定 Task)
执行 flush
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
部署一个 ExecutionVertex(Task)
EventTime
RecordWriter
创建 ShuffleIOOwnerContext
return true;
如果 JobID 冲突了,拒绝处理
针对 JobGraph 中的每个 IntermediateResultPartition 将来都会创建一个 ResultPartitioin
run
等待 future 完成后,重置suspendedDefaultAction才能继续 mailbox loop(等待 input 和 output 可用后,才会继续)
添加一个新的BufferConsumer
return new SingleRecordWriter<>(recordWrites.get(0));
inputChannel instanceof RemoteInputChannel
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
* 相似的地方:线程模型: TaskExecutor ==== Executor\t\t\tJVM\t\t * tdd ====== TaskDescriptor\t\tThread\t\t * 不相同的地方:\t\t * \tFlink 每个Task 发布的时候,单独启动一个线程来执行: TaskManager(TaskExecutor) jvm ---> task(线程)\t\t * Spark 一个节点的抽象: Worker, 一个任务进行的抽象: Executor, 运行的Task: 一个线程
if(notifyDataAvailable)
final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
new PipelinedSubpartition
创建 createKnownInputChannel
在 ExecutionGraph 中,一个 ExecutionVertex 对应到要启动一个 Task
hasNewMail = true;
while (running)Thread.sleep(timeout);
创建默认的 MemoryStateBackend
StreamTaskSourceInput
this.partitionManager = checkNotNull(partitionManager);\t\tthis.taskEventPublisher = checkNotNull(taskEventPublisher);
只有当 状态为 DEPLOYING 的时候,才能执行如果状态不是 DEPLOYING,则释放归还 Slot
按照 out StreamEdge 的个数来构建多个 RecordWriter,不过一般就是一个
回调,在 ResultSubparition 通知 ResultSubparitionView 有数据可供消费
for(InputChannel inputChannel : inputChannels.values())
inputGate.notifyChannelNonEmpty(this);
if(throwable == null)
slotContext = AllocatedSlot
selector.isBroadcast()
启动 SourceStreamTask 的 SourceThread 线程
执行 Task 的线程
资源申请好了真正的开始部署task
ExecutionJobVertex consumerVertex = consumer.get(0).getTarget().getJobVertex();\t\tint maxParallelism = consumerVertex.getMaxParallelism();\t\treturn maxParallelism;
this.channelIOExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(\"channel-state-unspilling\")); }
if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state)设置为 Active 状态
Y进行 Record 的反序列化
jointFuture.thenRun(suspendedDefaultAction::resume);
获取该 targetChannel 对应的 BufferBuilder
processBufferOrEvent(bufferOrEvent.get());
添加一个BufferConsumer,用于读取写入到 MemorySegment 的数据
如果配置不为空
final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
通过反序列化得到 ExecutionConfig,从 ExecutionConfig 中可以的到所有算子相关的信息
创建 ResultSubPartition
调用 TaskDeploymentDescriptorFactory.createDeploymentDescriptor() 创建 DeploymentDescriptor 对象
获取一个 TaskManagerGateway
while(true)
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
flushRequested = buffers.size() > 1 || notifyDataAvailable;
正常结束
ResultSubpartition 是 ResultPartition 的一个子分区。每个 ResultPartition 包含多个 ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定
更新最后状态
final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
处理输入 inputProcessor = OneInputxxxx
headOperatorWrapper.endOperatorInput(inputId);
if(previous != null) {\t\t\t\tthrow new IllegalStateException(\"Result partition already registered.\");\t\t\t}
当前这条记录没有写完,申请新的 buffer 写入
初始化 MailboxProcessor注册processInput注意:processInput第一次调用是在代码执行到invokable.invoke()的runMailboxLoop方法流程中的mailboxDefaultAction.runDefaultAction(defaultActionContext) 触发消息才执行Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(二)的流程会来调用这里
取消超时 SlotRequest 的超时
\tInputStatus status = input.emitNext(output);
this.sourceThread = new LegacySourceFunctionThread();
if (bufferOrEvent.isPresent()
return requestNewBufferBuilder(targetChannel);
return bufferBuilder;
if(status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {\t\t\treturn;\t\t}
* Task 是所有 Task 的抽象! * 但是 在 Flink 的实现有很多种: * 1、StreamTask (Source Sink) * 2、BoundedStreamTask * 3、OneInputStreamTask 只是针对一个 DataStream * TwoInputStrewamTask union join * MultiInputStreamTask 超过2个// 当具体这个 Task 要执行的时候,其实就会去判断, //对应的这个 Task 到底对应的那个 ExecutoVertex 对应的启动类 Invokable 是谁//通过反射的方式来调用启动类执行
executionVertex.deploy();
由 RUNNING 状态改成: FINISHED 状态
* 调用 Execution 的 deploy() 方法部署 * 第一次执行失败,有可能是重试,但是这两次执行的是同一个 ExecutionVertex ,怎么区分呢? * 没执行一次 ExecutionVertex ,就封装一个 Execution 对象!
output.emitRecord(recordOrMark.asRecord());
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
backend = fromConfig;
if(outputPartitioner instanceof ConfigurableStreamPartitioner) {\t\t\tint numKeyGroups = bufferWriter.getNumTargetKeyGroups();\t\t\tif(0 < numKeyGroups) {\t\t\t\t((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);\t\t\t}\t\t}
assignExclusiveSegments();
通知readView,有数据可用了
检查配置中,是否配置了 StateBackend
InputChannel
分配 slot 资源
获取 MemoryManager
ensureControlFlowSignalCheck();
return mailboxProcessor.suspendDefaultAction();
所有 ReesultSubPartition 全部 flush
for(InputGate gate : inputGates) {\t\t\tgate.setup();\t\t}
校验
喚醒當前等待的綫程
tdd.loadBigData(blobCacheService.getPermanentBlobService());
inputGates[gateIndex] = inputGate;
notifyDataAvailable();
设置失败信息
获取 slot 申请消息
先生成一个 ShuffleDescriptor 数组
if(logicalSlot.tryAssignPayload(this))
加入队列中\t\t\t * 既然将 有数据可用的channel 加入到 inputChannelsWithData,\t\t\t * 那就证明,一定有其他的什么角色来从这个队列中获取 可用的channel 来消费数据
初始化 RpcTaskOperatorEventGateway
return InputStatus.END_OF_INPUT;
提交 Task
其实这个 output 就是负责完成这个 StrewamTask 的所有数据的输出输出到 ResultPartition初始化输出 ChannelSelectorRecordWriter
broadcastEmit(record);
创建 ResultPartition 实例
进行注册
NoOpChannelStateWriter
启动的时候runMailboxLoop告诉 MailBox 先暂停 loop
做了一个包装
poison mail 的消息Nsource有数据的时候走这里
创建算子运行实例类
构造 StreamTask 实例注意最后一个参数:每个 StreamTask 都会生成一个 TaskMailboxImpl 对象
异步提交任务
taskStateManager.close();
如果之前已经注册过,则报错
return taskSlot.getMemoryManager();
readView.notifyDataAvailable();
userFunction.run(ctx);
gate.setup();
FlinkKafkaConsumerBase.run(ctx);
一个 InputGate 中,根据需要上游的 几个 Task 拉取数据,就会有多少个 InputChannel
ChannelStateWriterImpl
if(!currentExecution.tryAssignResource(slot))
ResultSubpartition
return slotContext.getTaskManagerGateway();
部署 Task 的时候,也有可能会报错!* 1、slotAssigned* 2、deploymentHandle* 3、slotExecutionVertexAssignment
遍历生成
FutureUtils.assertNoException(assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
创建该 InputGate 中的多个 InputChannel
factoryClassName = \"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory\";
加入 独占buffer池BufferManager类中实现
PipelinedSubpartition.flush();
* 注释: 真正运行用户的 Operator\t\t\t * 1、如果你使用:env.socketTextStream() 则调用: SocketTextStreamFunction\t\t\t * 2、如果你使用:Kafka数据源, 则调用: FlinkKafkaConsumerBase\t\t\t * ......\t\t\t * function --> transformation ---> streamOperator\t\t\t * headOperator.run();
partition.setup();
Ncheckpointed
\treturn localBufferPool;
如果输入还有数据,并且 writer 是可用的,这里就直接返回了
先生成一个容器数组
用键组的数量(也就是最大并行度)初始化分区
* 注释: 注册 ResultPartition 启动好了之后,会注册在 ResultPartitionWriter 中\t\t * partitionManager 负责这个 Task 之上的所有的数据的输出\t\t * 当前这个 Task 输出的所有数据就被抽象成一个整体: ResultPartition\t\t * 这个Task输出的数据有可能要被分发到下游的多个Task,就证明产出多个分区: ResultSubpartition\t\t * ResultPartition 包含多个 ResultSubpartition\t\t * -\t\t * TaskManager - TaskExecutor - ResultPartitionManager(管理多个 ResultPartition)
return sourceReader.pollNext(currentMainOutput);
jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());\t\t\t\ttaskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
创建 ResultPartition
return taskManagerGateway;
执行任务部署
获取线程状态
注册输入分区
operatorChain.endHeadOperatorInput(1);
queueChannel(checkNotNull(channel));
\tcurrentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
添加 BufferConsumer,说明已经有数据生成了
当前任务的 Task 信息
outputFlusher.start();
serializer.serializeRecord(record);
return InputStatus.NOTHING_AVAILABLE;
return new NonRecordWriter<>();
接:Flink源码——Job 提交、部署流程源码分析之 Slot 管理(申请和释放)
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
timeout =100
注册状态
更新 backlog 的数量,只有 buffer 才会使得 buffersInBacklog + 1,事件不会增加 buffersInBacklog\t\t
if (toNotify != null)
StreamOneInputProcessor.emitNext
mailboxExecutor.yield();
for(InputGate inputGate : inputGates)inputGate.close();
SourceStreamTask
queue.addFirst(mail);
创建 SingleInputGate
inputGate.close();
初始化 InputGate
writer.open();
流式处理
进行 record 输出
if(taskSlot != null)
释放内存资源
BroadcastRecordWriter
recordWrites.size() == 0
return new MultipleRecordWriters<>(recordWrites);
if(currentRecordDeserializer != null)
loop();
* 注释: 如果注册成功,则通过一个线程来运行 Task* 当初在初始化 Task 对象的时候,构造方法的最后一句代码,其实就是初始化一个线程* 一台TaskManager 抽象的 slot 32 16 64
TwoInputStreamTask
deployAll(deploymentHandles)
循环为每个 DeploymentHandle 去分配资源给自己对应的 ExecutionVertex
N释放资源
根据 Task 获取 TaskSlot
batch.addFirst(mail)
OutputMetrics
创建 SingleInputGate
ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
InputStatus status = input.emitNext(output);
\tdoRun();
重新整合卸载的数据
创建一个 LocalBufferPool
bufferBuilder = requestNewBufferBuilder(targetChannel);
当初在提交一个 job 之后,会先启动 JobMaster,在初始化 JobMaster\t * 同时创建一个 Shceduler = DefaultScheduler\t * 在被创建的时候: 同时会把 JobGraph 变成 ExecutionGraph
StreamTask.processInput
遍历的部署每一个 Task
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
suspendedDefaultAction = null;
初始化一个线程:LegacySourceFunctionThread这是 source 用于产生 data 的一个线程
\treturn consumableNotifyingPartitionWriters;
检查和 ResourceManager 的链接是否为空
ShuffleDescriptor[] shuffleDescriptors = inputGateDeploymentDescriptor.getShuffleDescriptors();
return executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
while(!queue.isEmpty())间隔释放资源
suspendedDefaultAction = new DefaultActionSuspension();
提交 Task 让 TaskManager 启动 第一个参数: TaskDeploymentDescriptor 包含启动当前这个 Task 所需要的一切信息
CAS
\tmemoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
while (result.isFullBuffer())
使用
slot.releaseSlot(new FlinkException(\"Actual state of execution \" + this + \" (\" + state + \") does not match expected state DEPLOYING.\"));
flushTargetPartition(targetChannel);
获取当前 StreamTask 的输出 SreamEdge
启动 ResultPartitionWriter 和 InputGate
for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters)partitionWriter.close();
创建 ShuffleIOOwnerContextshuffleEnvironment在 TaskExecutor 初始化的时候,就已经创建好了
\tfor(int partitionIndex = 0; partitionIndex < resultPartitions.length; partitionIndex++)
运行 SourceOperator
inputChannel instanceof RemoteRecoveredInputChannel
流分区\t\t\t subpartitions.length = 下游 Task 的个数
此时调用以上流程的这个方法
AbstractFileStateBackend
这是volatile的参数为true
partitionManager.registerResultPartition(this);
if (checkpointedInputGate.isFinished())
\tthis.thread.start();
((BoundedOneInput) wrapped).endInput();
* slot = SingleLogicSlot\t\t\t * taskManagerGateway = RPCTaskManagerGateway\t\t\t * 一台物理从节点
创建一个用户加载用户代码的类加载器
初始化一个 ResultSubpartition 数组
ProcessingTime
\tAbstractInvokable invokable = null;
构建的 DeploymentHandle
invokable.invoke();
flush 到对应的 ResultPartition 中targetChannel = InputChannel\t\ttargetPartition = ResultPartition
recordDeserializers[channelIndex] = null;
启动任务的时候需要的待恢复的数据
创建一个 BufferManager
TaskSlot<T> taskSlot = getTaskSlot(allocationID);
\t\t * 根据参数 state.backend 来创建响应的 StateBackend\t\t * -\t\t * 1、MemoryStateBackend 把状态存储在job manager的内存中\t\t * 2、FsStateBackend 把状态存在文件系统中,有可能是本地文件系统,也有可能是HDFS、S3等分布式文件系统\t\t * 3、RocksDBStateBackend 把状态存在 RocksDB 中\t\t * -\t\t * 按照我们的配置,一般获取到的是 FsStateBackend
notifyDataAvailable = insertAsHead || finish || shouldNotifyDataAvailable();
userCodeClassLoader = createUserCodeClassloader();
获取下游的 消费者个数
构建 Task
if(fromConfig != null)
CountingOutput.collect()
TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
如果之前队列中没有channel,这个channel加入后,通知等待的线程
创建 ResultPartitionDeploymentDescriptor
DefaultScheduler
InputStatus status = inputProcessor.processInput();
初始化 TaskStateManagerImpl Task 状态管理
if(fromApplication == null)
处理输入
每个 InputGate 会包含一个以上的 InputChannel,和 ExecutionGraph 中的 ExecutionEdge 一一对应, 也和 ResultSubpartition 一对一地相连,即一个 InputChannel 接收一个 ResultSubpartition 的输出
if(notifyDataAvailable)
Task.run()
\tmemoryManager.releaseAll(invokable);
获取配置
Constructor<? extends AbstractInvokable> statelessCtor;statelessCtor = invokableClass.getConstructor(Environment.class);
inputChannelsWithData.add(channel);
InputMetrics
for(final DeploymentHandle deploymentHandle : deploymentHandles)
创建 TaskDeploymentDescriptor = tdd
初始化一个容器
初始化 ResultPartitionWriter
分配 slot 资源
final List<List<ExecutionEdge>> consumers = partition.getConsumers();
部署 Task 报错处理
调用 deployAll() 部署任务
批处理
添加到部署队列
\tbufferBuilders[targetChannel] = bufferBuilder;
输入已经处理完了,会调用这个方法一般不会走到这里
this.connectionId = checkNotNull(connectionId);\t\tthis.connectionManager = checkNotNull(connectionManager);
为 InputChannel 分配 Buffer, 直接为基于信用的模式将独占缓冲区分配给所有远程输入通道。
N创建 远程RemoteRecoveredInputChannel
计算逻辑处理
factory = clazz.newInstance();
从 LocalBufferPool 中请求 BufferBuilder
* 注释: 初始化 StreamTask 的时候,初始化 MailboxProcessor, 同时,执行 StreamTask 的 processInput() 方法 * 1、如果为 SourceStreamTask 的话,processInput 方法会启动 SourceStreamTask 的 sourceThread * 2、如果为其他的非 SourceStreamTask 的话,则根据情况(StreamOneInputProcessor 或者 StreamTwoInputProcessor)处理输入情况 * - * 第二个参数:TaskMailboxImpl * 第三个参数:SynchronizedStreamTaskActionExecutor
获取该 ResultPartitionWriter具体实现:ConsumableNotifyingResultPartitionWriterDecorator
获取数据
final ArrayDeque<Buffer> exclusiveBuffers --->全局缓冲池中当前可用的排他缓冲区。final ArrayDeque<Buffer> floatingBuffers--->固定缓冲池中当前可用的浮动缓冲区。
output.collect(reuse.replace(element));
处理启动结果,如果启动有错,则进行错误处理。
把原来record的数据序列化结果
failure != null
@SuppressWarnings(\"rawtypes\
StreamSourceContexts.collectWithTimestamp()
获取 Job 和 Task 信息
redistributeBuffers();
for(ResultPartitionWriter partition : producedPartitions) {\t\t\tpartition.setup();\t\t}
executingThread.setContextClassLoader(userCodeClassLoader);
//readView 是 ResultSubPartition 的消费者视图 对象 // 下游的一个Task 可能会消费上游的多个Task的某一个分区的数据。 // 上游个任意一个Task的任意一个分区叫做: ResultSubPartition, // 这个 ResultSubPartition 对应一个消费者: PipelinedSubpartitionView
if(type.isBlocking())
waitInFlightInputsFinished();
获取到 state.backend 的配置
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
Flink源码(1.11.x)Job 提交、部署流程源码分析之Task 部署、提交、執行(一)
releaseResources();
increaseBuffersInBacklog(bufferConsumer);
启动ResultPartition
获取流分区器
报错处理
return statelessCtor.newInstance(environment);
notifyDataAvailable = !isBlockedByCheckpoint && buffers.size() == 1 && buffers.peek().isDataAvailable();
if(state == SCHEDULED || state == CREATED)
检查配置中,是否配置了 StateBackend,如果配置了 StateBackend, 则代码走这儿,一般企业中,用 FsStateBackend
如果之前的状态是 SCHEDULED 或者 CREATED, 现在改成 DEPLOYING
if(desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined())
\tchannelStateReader.close();
发送
final String backendName = config.get(CheckpointingOptions.STATE_BACKEND);
allocatedSegments.remove(owner);segments.clear();
获取 ExecutionGraph 中的 ExecutionVertex
为这个 Task 的 InputGate 中的 InputChannel 分配 BufferPool
SourceStreamTask.processInput
SingleInputGate[] inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
获取该 StreamTask 的输出 StreamEdge 集合
一个 ResultParition 关联到一个 ResultPartitionWriter
把申请的 slot分配到每个ExecutionVertex上
this.stateBackend = createStateBackend();
return markExistingSlotActive(taskSlot);
SingleInputGate.setup()---> * Buffer MemorySegment * 流式计算引擎: 上游Task执行完毕一条数据的计算之后,就会发送这条数据的计算结果给下游Task * 到底怎么给,规则是由 StreamPartitioiner 来指定的 * 一条数据在一个Task 执行完毕之后,就要发送给下游个另外一个Task * 这个过程,这个网络数据传输过程,是由 Netty 支持的,具体是由 IntputChannel 实现Buffer Channel
sourceThread.setTaskDescription(getName());\t\tsourceThread.start();
* 内部会初始化一个执行线程。一个Task 是线程级别的执行粒度\t\t\t * 当初 JobMaster 提交 Task 提交过来的时候。其实是 : tdd\t\t\t * 最终经过一系列的初始化,准备,校验,等等各种操作,把 TDD 转变成 Task
BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);
releaseDeserializer(channelIndexes.get(bufferOrEvent.getChannelInfo()));
一个 out StreamEdge 来构建 一个 RecordWriter大概率 createRecordWriter() 方法的返回值是: ChannelSelectorRecordWriter
获取配置中的 StateBackend
final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
创建 StateBackend
初始化 ResultPartitioner具体实现是: ResultPartition and ResultSubPartition一般来说,一个 Task 就只有一个 ResultPartition
inputChannelsWithData.notifyAll(); toNotify = availabilityHelper.getUnavailableToResetAvailable();
allBufferPools.add(localBufferPool);
at least once
查看/获取 用户设置的时间语义(ProcessingTime, IngestionTime, EventTime)
注册放入Map
result = serializer.copyToBufferBuilder(bufferBuilder);
executingThread.start();
判断这个channel是否已经在队列中
ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
初始化 RpcInputSplitProvider
assignAllResources(deploymentHandles)
((RemoteRecoveredInputChannel) inputChannel).assignExclusiveSegments();
Y创建 本地LocalRecoveredInputChannel
CAS改变状态
outputFlusher.run();
\tif(headOperatorWrapper != null)
if(isMailboxThread())
注册 ResultPartitionWriter
executionVertex.tryAssignResource(logicalSlot);
每个 InputGate 会包含一个以上的 InputChannel,和 ExecutionGraph 中的 ExecutionEdge 一一对应, 也和 ResultSubpartition 一对一地相连,即一个 InputChannel 接收一个 ResultSubpartition 的输出//消费的目标 ResultPartitionIDprotected final ResultPartitionID partitionId;//属于哪一个 SingleInputGateprotected final SingleInputGate inputGate;
if(suspendedDefaultAction == null)
唤醒notEmpty.await()的地方 见:Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(二)
ChannelStateWriteRequestExecutorImpl.run
如果可以通过 currentRecordDeserializer 反序列化得来结果
用于请求/回收*专用或浮动缓冲区的通用缓冲区管理器。private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue() ---> 该队列中,放置的是: NetworkBuffer(封装了 MemorySegment)
flushAll();
这个线程,在创建 Task 对象的时候,就已经会初始化好了\t\t经过转换,最终,就是调用当前类的 run() 方法
targetPartition.flushAll();
controlMail 都是最先被处理的
不是 checkpoint 阻塞,buffers大小为 1, 数据可用
Timeout<K> timeout = timeouts.remove(key);timeout.cancel();
for(int i = 0; i < inputChannels.length; i++)
后续流程见:Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(二)
recordWriter.emit(serializationDelegate);
某个 channel 有可写入数了,该干活了。
根据 ExecutionVertexId 获取 ExecutionVertex
由 DEPLOYING 状态改成: RUNNING
线程状态切换
if(status == InputStatus.END_OF_INPUT) { controller.allActionsCompleted(); return; }
LockSupport.unpark(this);
0 条评论
下一页
为你推荐
查看更多