canal
2022-07-10 20:44:11 0 举报
canal
作者其他创作
大纲/内容
EntryType.ROWDATA
EventTransactionBuffer
putSequence 代表当前put操作最后一次写操作发生的位置
private Event[] entries;
entry.getEntryType()
MysqlEventParser
UPDATE_ROWS_EVENT
EntryPosition
executor.scheduleWithFixedDelay
SinkFunction
List<CanalEventDownStreamHandler> handlers
main方法
// 初始化报警机制 initAlarmHandler();
EntryType.ROWDATA
run()
controller = new CanalController(properties);
读取: canal.conf
解析
seek
flush();// 刷新上一次的数据 put(entry);
AclClientRPCHook
ROWS_QUERY_LOG_EVENT
handler.retry(events)
ROWDATA
QUERY_EVENT
store处于full后,retry时处理做一下处理
// 初始化metaManager initMetaManager();
CanalStarter
private int bufferSize = 1024; private int indexMask; private CanalEntry.Entry[] entries;
交给
CanalMQProducer
BinlogParser
QueryLogEvent
while (fetcher.fetch())
LogEventConvert
ackSequence代表当前ack操作的最后一条的位置
CanalEntry.Entry
使用connector.fork()重新创建一个MysqlConnection
canal.auto.scan.interval
MysqlConnector
EntryEventSink
TRANSACTIONEND
handler.after(events);
// 初始化eventParser; initEventParser();
连接相关
实例3
WRITE_ROWS_EVENT
/otter/canal/cluster/{registerIp + \":\" + port}注册
DefaultMQProducer defaultMQProducer
// 初始化eventSink initEventSink();
flush()
controller.satrt
EntryType.GTIDLOG
add(CanalEntry.Entry entry)
parseThread
CanalMQStarter
subscribe(ClientIdentity clientIdentity)
二
HEARTBEAT
spi
getSequence代表当前get操作读取的最后一条的位置
start2:持有
new CanalStarter(properties)
isGTIDMode
config.getMode()
提交
一
// 初始化eventStore initEventStore();
CanalLauncher
private int bufferSize = 16 * 1024; private int bufferMemUnit = 1024; // memsize的单位,默认为1kb大小 private int indexMask; private Event[] entries;
实例化1
CanalAdminApplication
实例化2
GTID_LOG_EVENT
start3:
实例化
将Entry 转成event
fork
TABLE_MAP_EVENT
put(entry); flush();
TransactionFlushCallback
start
对应每个instance启动一个worker线程
CanalInstanceWithManager
SpringCanalInstanceGenerator
心跳
tryPut(List<Event> data)
canalStater.start();
CanalInstanceGenerator
构建
TableMapLogEvent
CanalServerWithEmbedded
embededCanalServer.start();
put(entry);非dml 直接flush();
LogDecoder
canal.destinations
执行dump前的准备工作
PlainCanalInstanceGenerator
/api/v1/config/server_polling:findServer
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);/
AbstractCanalInstance
CanalController
start4
put(entry); flush();
start1
MysqlConnection
TRANSACTIONBEGIN
List<CanalEntry.Entry> entrys
canalServer.subscribe(clientIdentity);
重新链接,因为在找position过程中可能有状态,需要断开后重建
循环处理
erosaConnection.reconnect();
dump
preDump(erosaConnection);
serverId
show variables like 'server_id'
events
LogEvent
rpcHook
handler.before(events);
ErosaConnection
获取最后的位置信息
最后设置到
最终委托给
erosaConnection.connect()
instanceGenerator.generate(destination);
ThreadPoolExecutor sendPartitionExecutor;
加入
CanalRocketMQProducer
这里的心跳,感觉是个假的心跳,并没有用到connection相关的内容。启动一个定时任务,默认3s发送一个心跳的binlog给sink阶段,表名parser还在工作。在sink阶段,会把心跳的binlog直接过滤,不会走到store过程。
0 条评论
下一页