flink-cdc主流程
2022-03-02 22:20:58 10 举报
flink-cdc主流程
作者其他创作
大纲/内容
handleSplitRequest
splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd)) -> splits.addAll(splitsChanges.splits())
createSplitFetcher()
while(run()){ runningTask.run() }
taskQueue
sourceReader.start(splits)
fetcher.addSplits(splitsToAdd)
FetchTask.run()
elementsQueue
currentReader.submitSplit(nextSplit)
assignSplits
MySqlSplitReader
operatorEventGateway.sendEventToCoordinator( new RequestSplitEvent(getLocalHostName()));
MySqlRecordEmitter
splits
MySqlSource
SourceCoordinatorContext.assignSplits -> gateway.sendEvent(addSplitEvent);
MySqlSnapshotSplitReadTask
currentReader.pollSplitRecords()
AddSplitsTask.run()
new FetchTask()
MySqlSourceEnumerator
MySqlBinlogSplitAssigner/ MySqlHybridSplitAssigner
SourceOperator.open()
splitAssigner.getNext()
MySqlSourceReader
StatefulTaskContext.getQueue
SingleThreadFetcherManager(MySqlSplitReader).addSplits(splits)
SnapshotSplitReader
BinlogSplitReader
SourceReaderBase.addSplits -> splitFetcherManager.addSplits(splits)
SourceOperator.handleOperatorEvent -> sourceReader.addSplits
lastRecords
startFetcher()
SourceReaderContext.sendSplitRequest
MySqlSplitReader.fetch
收藏
0 条评论
下一页