启动canal实例
2024-01-24 09:32:09 2 举报
AI智能生成
启动canal实例是指运行Canal开源项目所提供的服务,用于实现MySQL数据库和下游系统之间的数据同步。Canal是一种基于Apache Kafka的事件流式处理平台,可用于监控数据库变更,将变更记录以消息的形式发送到Kafka供下游系统消费。在启动Canal实例的过程中,需要配置相关的数据库连接信息、Kafka连接信息和安全认证等信息。完成配置后,Canal将启动一个工作进程,持续监控数据库的变更,并将变更数据发送到Kafka主题中。这个过程有助于实现跨系统的数据一致性,提高系统的性能和可用性。
作者其他创作
大纲/内容
runningMonitor.start();
processActiveEnter(); 单机不走ZK
embededCanalServer.start
CanalInstanceWithSpring.start方法
metaManager.start()
FileMixedMetaManager
启动定时工作任务
定时将内存中的最新值刷到file中
eventStore.start()
eventSink.start()
eventParser.start()
MysqlEventParser
构建binlogParser
初始化缓冲区transactionBuffer,设置元素个数=1024
buildParser();
调用父类的构造方法进行初始化
定义EventTransactionBuffer的flushCallback
binlogParser.start()
parseThread = new Thread
构造Mysql连接
随机生成slaveId
启动一个心跳线程
连接Mysql
find start position
参考instance position 配置
定义sinkHandler
解析事件,记录lastPosition和lastEntryTime
multiStageCoprocessor.start()
RingBuffer.createSingleProducer
创建了一个单生产者的RingBuffer,这意味着只有一个线程(或事件发布者)会向它发布事件
配置处理阶段
1、简单解析阶段
SimpleParserStage.onEvent
记录表结构
记录needDmlParse标识
2、DML解析阶段
DmlParserStage.onEvent
如果event.isNeedDmlParse()为ture
entry = logEventConvert.parseRowsEvent
这里记录下生产曾经出现过得异常问题
event.setEntry(entry)
3、存储接收阶段
SinkStoreStage
transactionBuffer.add(event.getEntry())
TRANSACTIONBEGIN
flush();// 刷新上一次的数据
ROWDATA
put(entry)
TRANSACTIONEND
put(entry);
flush();
flush();
发送semi ack
开始dump数据
sendRegisterSlave
sendBinlogDump(binlogfilename, binlogPosition)
告诉主服务器我开始接收binlog事件
connector.getReceiveBufferSize()
读取接收到的事件
coprocessor.publish(buffer)
将数据发布到协处理器进行处理
使用环形缓冲区(RingBuffer)来传递数据
异常处理:如果RingBuffer没有足够的容量
applyWait(++fullTimes)
小于3,Thread.yield()
大于3,LockSupport.parkNanos(100 * 1000L * newFullTimes)
parseThread.start()
canalMQStarter.startDestination(destination)
executorService.execute(canalMQRunnable)
run()
logger.info("## start the MQ producer: {}.", destination)
canalServer.subscribe(clientIdentity)
logger.info("## the MQ producer: {} is running now ......", destination)
while循环
获取数据
是否能从流式数据中获取到positionRanges
第一次positionRanges为空,从eventStore中的第一条开始获取,获取指定大小的数据,阻塞等待其操作完成或者时间超时
eventStore.get(start, batchSize, timeout, unit)
记录到流式信息
下次判断流式数据不为空时,代表next+1开始获取
logger.info("getWithoutAck successfully, clientId:{} batchSize:{}. . . . . .
MQ生产者发送
MQMessageUtils.buildMessageData(message, buildExecutor)
转化为flatMessages
如果配置了partitionHash
构造partitionFlatMessages
遍历flatMessages,将每个flatMessage放到对应分区
如果是ddl
partitionMessages[0] = flatMessage
如果不是
获取hashMode
datas = partitionDatas.get(pkHashConfigs)
按照逗号分割,遍历pkHashConfig
data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr, '^'))
data.hashMode.autoPkHash = true
如果没有冒号,data.hashMode.tableHash = true
data.simpleName = pkHashConfig
并且遍历datas,如果存在data.simpleName=库名.表名
return data.hashMode
否则return null
如果hashMode.tableHash=true
int hashCode = table.hashCode()
partitionMessages[pkHash] = flatMessage
使用分区线程池,并行发送消息
sendMessage(message, partition)
批量等所有分区的结果
如果有异常,callback.rollback()
canalServer.rollback(clientIdentity, batchId)
callback.commit()
回调canalServer.ack(clientIdentity, batchId)
0 条评论
下一页