FLINK CDC 源码时序图-master代码
2022-03-27 21:49:50 8 举报
FLINK CDC 源码时序图
作者其他创作
大纲/内容
MySqlSnapshotSplitAssigner#onFinishedSplits把 ack 的split 加入 splitFinishedOffsets如果 allSplitsFinished: noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size()noMoreSplits(): remainingTables.isEmpty() && remainingSplits.isEmpty()并且上一次状态是 assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING并且 currentParallelism == 1 (并行度不为1不会立马设置状态 而是在 checkpoint 里面去设置)将 assignerStatus 的状态设置为下一个状态:INITIAL_ASSIGNING_FINISHED 或 NEWLY_ADDED_ASSIGNING_FINISHEDMySqlHybridSplitAssigner#onFinishedSplits同上MySqlBinlogSplitAssigner#onFinishedSplits 不做事情
E 唤醒 S Binlog
addSplits
enumerator 为空
start
restoreEnumerator1、从状态恢复 MySqlSplitAssigner2、创建 MySqlSourceEnumerator
S 向 E 请求 Split
添加到finishedUnackedSplits
S 向 E 请求 Snapshot Split Finished Size size
RecordsWithSplitIds<SourceRecord> fetch()
实现细节
run
Y
只有是MySqlHybridSplitAssigner才处理
getFinishedSplitInfos()
fetcher = createSplitFetcher()fetcher.addSplits(splitsToAdd)startFetcher(fetcher)
return null
snapshotState
recordsWithSplitId = this.currentFetch
sourceReader.pollNext
如果没拿到就 break
record !=null
FetchTask 实现 SplitFetcherTask
fetcher.addSplits(splitsToAdd)
处理 snapshot唤醒请求
读取 split 的 record
N
S 向 E 上报收 Binlog Reader 已经停止
请求获取BinlogSplitMetaRequestEvent此时会产生一个 GroupId
fetcher == null
handleSourceEvent
reportFinishedSnapshotSplitsIfNeed
MySqlSnapshotSplitAssigner#waitingForFinishedSplitsallSplitsFinished: noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size()noMoreSplits(): remainingTables.isEmpty() && remainingSplits.isEmpty()MySqlHybridSplitAssigner#waitingForFinishedSplits同上MySqlBinlogSplitAssigner#waitingForFinishedSplits返回 false
isSnapshotSplit
拿到当前 split 的 nextSplit
包装成 RecordsWithSplitIds
fetcher 不存在就创建,存在就 addSplit
E 向 S 反馈 Snapshot Split Finished Size
start()
onSplitFinished
ReaderRegistrationEvent
处理 Finished Size
elementsQueue.isEmpty()
E 向 S 发送一个 Split
构造方法
InputStatus.END_OF_INPUT
reportFinishedSnapshotSplitsIfNeed 同上
添加到unfinishedSplits
数据循环完毕
resetToCheckpoint恢复 enumerator
requestBinlogSplitMetaIfNeeded
suspendedBinlogSplit 不为 null代表这个 source 是处理 binlog split 的
enumerator.start()
wakeupBinlogReaderIfNeed
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue
SourceReaderBase
checkSplitOrStartNext
binlogReaderIsSuspended = true;
assignSplits
没有 FinishedSplit 会抛异常
替换suspendedBinlogSplit
sendSplitRequest同上
restoreEnumerator
FinishedSnapshotSplitsRequestEvent
split =null || moveToNextSplit =false
pollNext
elementsQueue
唤醒 reader 进行 snapshot 读取
把完成的 split 的 BinlogOffset 封装成LatestFinishedSplitsSizeEvent
发送SuspendBinlogReaderEvent
把 splitsToAdd List 加入 assignedSplits map
onFinishedSplits
suspendedBinlogSplit = null
BlockingDeque<SplitFetcherTask> taskQueue
fillMetaDataForBinlogSplit
waitingForFinishedSplits
MySqlSplitReader 实现 SplitReader
emitRecord
executors.submit(fetcher)
SourceEventWrapper
startFetcher(fetcher)
还有更多的SplitsAssignment或者plitFetcherManager 里面还有Fetch
BinlogSplitReader 实现 DebeziumReader
binlogReaderIsSuspended = false
给所有 source 发送
从 FinishedSplit 里面封装 binlogSplitMeta
把 binlogSplit addSplit
registerReader() 注册到 coordinator
checkpointCoordinator
SplitFetcher
完成的split是mySqlSplit
handleSuspendBinlogReaderAckEvent
E 请求唤醒 S 去读取 snapshot
JDBC 去读一批数据
handleSplitRequest
moveToNextSplit
MySqlSourceEnumerator
nextSplitId == null
binlogSplitMeta==null 则从 FinishedSplitInfos 初始化
runOnce()
suspendBinlogReaderIfNeed
taskQueue.offer(task)
发送请获取 LatestFinishedSplitsSize这个 size 是用来判断 snapshot 是否都完成用的,另只有在 snapshot 完成才解析 binlog 所以这个 size 很好有必要
代表 当 binlog 被停止之后,全量同步完了就要重新初始化 MySqlBinlogSplit并加入 unfinishedSplits 丢给 fetch
初始化 currentReader创建 SnapshotSplitReader 或者 BinlogSplitReader
binlogSplit.isCompletedSplit()
MySqlSplit nextSplit = splits.poll()
N是binlog
SnapshotSplitReader 实现 DebeziumReader
currentReader.submitSplit(nextSplit)
sendBinlogMeta
notifyCheckpointComplete
while(!closed)
unningTask.run()
MySqlSourceReader
拿到 split 发配到 source
assignSplits()
MySqlSnapshotSplitAssigner#open1、初始化 chunkSplitter2、发现新表3、起一个异步线程对` remainingTables` 的表进行 split 切分4、把切分好的 splits 放入 `remainingSplits`5、把切分好的 tableId 从 `remainingTables` 移除MySqlHybridSplitAssigner#open同上MySqlBinlogSplitAssigner#open 啥也没干
SplitFetcherManager
split
splitFetcherManager.addSplits(splits)
S 向 E 获 取 Split Meta
继续请求
wakeUp(true)
N给所有 source 发送
isCompletedSplit
return InputStatus.MORE_AVAILABLE
如果 allSplitsFinished 为 false 就抛异常
addReader
向 assigner 获取finished split 大小
任务启动 E 要求 S 上报未 ack 的 split
stopBinlogSplitReader = true
WakeupReaderEvent
open()
Queue<MySqlSplit> splits = new ArrayDeque<>()
currentReader.pollSplitRecords()
splitAssigner.notifyCheckpointComplete
handleEventFromOperator
runningTask = taskQueue.take()
SuspendBinlogReaderEvent
循环readersAwaitingSplit
splitAssigner 状态是 SUSPENDED
FinishedSnapshotSplitsAckEvent
发送SuspendBinlogReaderAckEvent
下一个split
S 上报给 E 未 ack 的 splitFinishedSnapshotSplitsAckEvent
splitStates.size()==0
加入uncompletedBinlogSplits
只有 binlogSplit Reader 才处理
binlogSplit 追加 FinishedSplitInfo
splitAssigner.wakeup()
没有 fetch 到
拿出对应 GroupId 的 Meta
while(true)
addSplits(splits)
调用 父类的 addSplit 把 unfinishedSplits 丢给 Fetch
poll
mysqlSource
handleSplitsChanges
getNextFetch
recordsWithSplitId==null
fetcher = createSplitFetcher()
super.addSplits(unfinishedSplits)
处理 binlog 唤醒请求
RequestSplitEvent
LatestFinishedSplitsSizeRequestEvent
任务启动 E 告诉 S Assigner 状态是 SUSPENED
处理 ack 请求
sendSplitRequest 同上
处理 size 请求
wakeup
emitNext
readersAwaitingSplit.add(subtaskId)
initializeState恢复 readerState
splitAssigner.getNext()
AddSplitsTask 实现 SplitFetcherTask
moveToNextSplit有 split 返回 true没有 split 则返回false
splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd))
createEnumerator
从 Event 中获取 splits 的 finishedOffsets
TreeSet<Integer> readersAwaitingSplit
定期调度 syncWithReaders
startFetcher
open
isIdle = false
addSplit 同上
从uncompletedBinlogSplits 获取 binlogSplit
把接受到的 splitStates 加入 splitStates
return pollNext
Source
循环splits
isSnapshotReadFinished
SourceCoordinator
SourceOperator
InputStatus.MORE_AVAILABLE
拿到 finishedSplits
new SplitFetcher
sendSplitRequest
sourceReader.addSplits(恢复的 state)
topBinlogSplitReader = false
调用子类方法
判断可能修改 assigner 状态
isSuspended
splits.addAll
调用父类
InputStatus.NOTHING_AVAILABLE
状态从 SUSPENDED 变为 NEWLY_ADDED_ASSIGNING
checkNeedStopBinlogReader
更新 suspendedBinlogSplit 的 finishedSplitsSize 和 isSuspended = false
return currentFetch
包装成 BinlogSplitMetaEvent 发送
E 向 S 发送 Split Meta
同这里
sourceReader.start()
addSplits(binlogSplit)
splitAssignerisSuspended
MySqlSplitAssigner
SplitFetcherTask runningTask
initReader()
currentReader == null || currentReader.isFinished()
MySqlBinlogSplitAssigner#notifyCheckpointComplete 空实现MySqlSnapshotSplitAssigner#notifyCheckpointCompletecheckpointIdToFinish != null 并且 assignerStatus != Finished 状态并且 noMoreSplits() 并且 assignedSplits.size() == splitFinishedOffsets.size()设置 assignerStatus = assignerStatus.onFinish();MySqlHybridSplitAssigner#notifyCheckpointComplete同上
split 是否完结
发送
addSplit
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue
RecordsWithSplitIds lastRecords = splitReader.fetch()
readersAwaitingSplit移除 taskId
如果 binlogSplit 没有完成,没有完成是指的 Snapshort 没有完成,所以需要反复获取 split 的最高水位
把 split加入finishedUnackedSplits
0 条评论
回复 删除
下一页