Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(二)
2022-04-08 16:42:49 19 举报
Flink源码——Job 提交、部署流程源码分析之Task 部署、提交、執行(二)
作者其他创作
大纲/内容
beforeInvoke();
为 chained outputs 创建收集器
为每一个 Operator 创建 OutputCollector
Y
NDirectedOutput
SourceFunction<?> source = headOperator.getUserFunction();
遍历每个输出边,给每个 outEdge 构造一个 RecordWriterOutput 实例
for(StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {\t\t\t@SuppressWarnings(\"unchecked\
linkOperatorWrappers(allOpWrappers);
return !batch.isEmpty();
source instanceof ExternallyInducedSource
\tmaybeMail = mailbox.tryTake(MIN_PRIORITY);
创建 OperatorWrapper
return Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates));
获取 OperatorChain 的第一个 Operator
pushToOperator(record);
return new UnionInputGate(inputGates.toArray(new IndexedInputGate[0]));
创建 InputGate
mailboxProcessor.runMailboxLoop();
默认输出组件: StreamTaskNetworkOutput
if(!hasNewMail)
throw new UnsupportedOperationException(\"Unrecognized Checkpointing Mode: \" + config.getCheckpointMode());
构建 OperatorChain 对象
runMailboxLoop();
while(isDefaultActionUnavailable() && isMailboxLoopRunning())
if(containingTask.getExecutionConfig().isObjectReuseEnabled())
mail = queue.pollFirst()) != null
init();
初始化得到 output 输出数组集合
IndexedInputGate[] sortedInputGates = Arrays.stream(inputGates).flatMap(Collection::stream)\t\t\t.sorted(Comparator.comparing(IndexedInputGate::getGateIndex)).toArray(IndexedInputGate[]::new);
第一步: 初始化输入
接:Flink源码(1.11.x)Job 提交、部署流程源码分析之Task 部署、提交、執行(一)
for(int i = 0; i < outEdgesInOrder.size(); i++)
所有 OperatorWrapper 对象集合,把headOperatorWrapper 放入到最后, 其实一个 OperatorChain 中,包含了多个 Operator,最终都被封装成 OperatorWrapper 放入这个集合中
为每一个 Operator 构造 RecordWriterOutput
new CheckpointBarrierTracker
CopyingChainingOutput
运行 mail
batch.addLast(mail);
处理元素userFunction是用户自定义的算子
将上一个遍历出来的 OpeartorWrapper 设置为当前 OpeartorWrapper 的下一个
检查源是否实际上在诱发检查点,而不是触发
triggerCheckpointAsync
DataOutput<IN> output = createDataOutput();
hasNewMail = false;return !batch.isEmpty();
通过配置获取: StreamOperatorFactory
StreamConfig configuration = getConfiguration();\t\tint numberOfInputs = configuration.getNumberOfInputs();
创建 StreamTaskInput = StreamTaskNetworkInput因为该 StreamTask 需要从上游拉取数据,进行消费,所以默认实现是: StreamTaskNetworkInput\t\t\t
N
迭代 previous OpeartorWrapper
构建 output 输出对象可能为: ChainingOutput 或者 CopyingChainingOutput
KeyedProcessOperator.processElement
1、如果是外部诱导源,就注册一个 savepoint 钩子 2、手动触发的 savepoint 逻辑和 checkpoint 公用相同的逻辑,唯一不同的点在于: savepoint 是手动触发, checkpoint 是定时调度触发
某算子计算完写出数据
以非阻塞方式接收邮件并执行。
headOperator = operatorChain.getHeadOperator();
InputGate[] unionedInputGates = Arrays.stream(inputGates).map(InputGateUtil::createInputGate).toArray(InputGate[]::new);
\tthis.serializer = serializer;
out.collect(evtTrdDtlReal);
\toperator.processElement(castRecord);
如果是 SourceStreamTask,构造 StreamOperatorFactory = SourceOperatorFactory
EXACTLY_ONCE
YCopyingBroadcastingOutputCollector
allOpWrappers.add(headOperatorWrapper);
ChainingOutput
以 forward topological order 链接 operator wrappers
Y 当前遍历出来的 OpeartorWrapper 设置为 上一个的 previous
if(containingTask.getExecutionConfig().isObjectReuseEnabled())
创建 DataOutput = StreamTaskNetworkOutput
actionExecutor.runThrowing(runnable);
while(isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent())
invokable.invoke();
MailboxExecutor mailboxExecutor = containingTask.getMailboxExecutorFactory().createExecutor(configuration.getChainIndex());
inputGates.size() == 1
YCopyingDirectedOutput
StreamOperatorFactory<OUT> operatorFactory = configuration.getStreamOperatorFactory(userCodeClassloader);
为每一个 StreamEdge 创建 Collector
* 所有 StreamTask 的基类。Task 是由 TaskManager 部署并执行的本地处理单元。 * 每个 Task 运行一个或多个{@link StreamOperator},这些 StreamOperator 形成 Task 的 OperatorChain * 链接在一起的 Operators 在同一线程中并因此在同一 stream partition 上同步执行。 * 这些链的常见情况是连续的 map / flatmap / filter 任务。 *任务链包含一个 \"head\" operator 和多个 chained operators *StreamTask 用 head operator 来定义,分为 one-input 和 two-input两种 Task,以及 sources,iteration heads 和 iteration tails。
List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
跳转到所有 StreamTask 的基类
将 这个 OperatorChain 中的多个 Operator 都串接起来allOperatorWrappers 中的 OperatorWrapper 都是反向存储的
获取 OperatorEventDispatcherImpl 实例
启动 StreamTask
创建 StreamOneInputProcessor
创建 CheckpointedInputGate
没有数据的时候阻塞task线程
if(processMail(localMailbox))
return inputGates.get(0);
default
StreamTask.invoke()
* 注释: 调用 Operator 的 processElement 来处理 castRecord 数据记录\t\t\t\t * 假设下一个算子是 keyBy, 则跳转到 : KeyedProcessOperator\t\t\t\t * 因为之后要 shuffle 了,所以之后就没有其他的 Operator 了\t\t\t\t * map() = StreamOperator = StreamMap = operator
Flink源码(1.11.x)Job 提交、部署流程源码分析之Task 部署、提交、執行(二)
Task.doRun()
SourceStreamTask
如果有多个输出,或者输出是定向的,需要将它们包装为一个输出
batch.pollFirst()
maybeMail.get().run();
执行 StreamTask 的初始化
見:Flink源码——Checkpoint 管理机制源码剖析
获取 StateBackend
return Optional.ofNullable(batch.pollFirst());
N 返回 UnionInputGate
operator.setKeyContextElement1(castRecord);
为 network outputs 创建收集器
if(previous != null)
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
最后返回: StreamTaskNetworkInput
maybeMail = mailbox.tryTakeFromBatch()
通过 MailboxProcessor 来轮询 MailBox 处理 Mail
Y运行 Mail
* 1、如果有 mail 需要处理,这里会进行相应的处理,处理完才会进行下面的 event processing\t\t * 2、进行 task 的 default action,也就是调用 processInput()
AT_LEAST_ONCE
第三个参数:output 的初始化
執行checkpoint
\trunnable.run();
for(int i = 0; i < allOutputs.size(); i++) {\t\t\t\t\tasArray[i] = allOutputs.get(i).f0;\t\t\t\t}
跳转到下一个 Operator 来处理元素StreamMapkeyBy
获取
previous.setPrevious(current);
return allOutputs.get(0).f0;
private volatile boolean hasNewMail = false;
* 构建 OperatorChain 对象,里面会做很多事情 * 初始化 output 输出对象 * 主要做三件事情: * 1、调用createStreamOutput()创建对应的下游输出RecordWriterOutput * 2、调用createOutputCollector()将优化逻辑计划当中Chain中的StreamConfig(也就是数据)写入到第三步创建的RecordWriterOutput中 * 3、通过调用getChainedOutputs()输出结果RecordWriterOutput
NBroadcastingOutputCollector
创建 Operator
执行 mailbox 处理,正式工作 代码卡在这儿
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
new CheckpointBarrierUnaligner
if(selectors == null || selectors.isEmpty())
StreamEdge outEdge = outEdgesInOrder.get(i);
OneInputStreamTask
if(allOutputs.size() == 1)
CheckpointedInputGate inputGate = createCheckpointedInputGate();
处理 Mail
构建allOutputs
获取 headOperator
createInputGate()
\tcurrent.setNext(previous);
if(!mailbox.createBatch())
previous = current;
return this.mailboxProcessor::getMailboxExecutor;
创建得到一个 CheckpointedInputGate 数组
for(StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)
// 可以认为 接收数据线程中,要用到的 headOpeartor 终于被初始化了,其实到此为止,可以认为,在当前 OperatorChain 中要用到的各种组件都已经创建好了,可以接收数据,然后开始流式处理了。
最后创建的就是: CheckpointedInputGate
做这个检查是一种优化,只在预期的热路径中有一个volatile读,只在这个点之后才获得锁。第一次走if(!hasNewMail)是Y的逻辑
0 条评论
下一页
为你推荐
查看更多