启动canal实例
2024-01-24 09:32:09 2 举报
AI智能生成
启动canal实例是指运行Canal开源项目所提供的服务,用于实现MySQL数据库和下游系统之间的数据同步。Canal是一种基于Apache Kafka的事件流式处理平台,可用于监控数据库变更,将变更记录以消息的形式发送到Kafka供下游系统消费。在启动Canal实例的过程中,需要配置相关的数据库连接信息、Kafka连接信息和安全认证等信息。完成配置后,Canal将启动一个工作进程,持续监控数据库的变更,并将变更数据发送到Kafka主题中。这个过程有助于实现跨系统的数据一致性,提高系统的性能和可用性。
作者其他创作
大纲/内容
定时将内存中的最新值刷到file中
启动定时工作任务
FileMixedMetaManager
metaManager.start()
eventStore.start()
eventSink.start()
构建binlogParser
初始化缓冲区transactionBuffer,设置元素个数=1024
定义EventTransactionBuffer的flushCallback
调用父类的构造方法进行初始化
buildParser();
binlogParser.start()
随机生成slaveId
构造Mysql连接
启动一个心跳线程
连接Mysql
参考instance position 配置
find start position
解析事件,记录lastPosition和lastEntryTime
定义sinkHandler
创建了一个单生产者的RingBuffer,这意味着只有一个线程(或事件发布者)会向它发布事件
RingBuffer.createSingleProducer
记录表结构
记录needDmlParse标识
SimpleParserStage.onEvent
1、简单解析阶段
如果event.isNeedDmlParse()为ture
这里记录下生产曾经出现过得异常问题
entry = logEventConvert.parseRowsEvent
event.setEntry(entry)
DmlParserStage.onEvent
2、DML解析阶段
flush();// 刷新上一次的数据
TRANSACTIONBEGIN
put(entry)
ROWDATA
put(entry);flush();
TRANSACTIONEND
transactionBuffer.add(event.getEntry())
发送semi ack
SinkStoreStage
放入 Event[] entries
更新游标位置
notEmpty.signal()
eventStore.tryPut
applyWait
如果eventStore满了
doSink(events)
eventSink.sink
flushCallback.flush(transaction)
flush()
3、存储接收阶段
配置处理阶段
multiStageCoprocessor.start()
sendRegisterSlave
告诉主服务器我开始接收binlog事件
读取接收到的事件
connector.getReceiveBufferSize()
使用环形缓冲区(RingBuffer)来传递数据
小于3,Thread.yield()
大于3,LockSupport.parkNanos(100 * 1000L * newFullTimes)
applyWait(++fullTimes)
异常处理:如果RingBuffer没有足够的容量
将数据发布到协处理器进行处理
coprocessor.publish(buffer)
开始dump数据
parseThread = new Thread
parseThread.start()
MysqlEventParser
eventParser.start()
CanalInstanceWithSpring.start方法
embededCanalServer.start
logger.info(\"## start the MQ producer: {}.\
canalServer.subscribe(clientIdentity)
logger.info(\"## the MQ producer: {} is running now ......\
是否能从流式数据中获取到positionRanges
第一次positionRanges为空,从eventStore中的第一条开始获取,获取指定大小的数据,阻塞等待其操作完成或者时间超时
记录到流式信息
下次判断流式数据不为空时,代表next+1开始获取
logger.info(\
获取数据
转化为flatMessages
构造partitionFlatMessages
partitionMessages[0] = flatMessage
如果是ddl
data.hashMode.autoPkHash = true
如果没有冒号,data.hashMode.tableHash = true
data.simpleName = pkHashConfig
按照逗号分割,遍历pkHashConfig
datas = partitionDatas.get(pkHashConfigs)
return data.hashMode
并且遍历datas,如果存在data.simpleName=库名.表名
否则return null
获取hashMode
int hashCode = table.hashCode()
partitionMessages[pkHash] = flatMessage
如果hashMode.tableHash=true
如果不是
遍历flatMessages,将每个flatMessage放到对应分区
如果配置了partitionHash
如果有异常,callback.rollback()
批量等所有分区的结果
使用分区线程池,并行发送消息
MQ生产者发送
callback.commit()
while循环
run()
executorService.execute(canalMQRunnable)
canalMQStarter.startDestination(destination)
processActiveEnter(); 单机不走ZK
runningMonitor.start();
创建并启动canal实例
0 条评论
回复 删除
下一页