03_flume和canal原理及源码分析
2024-05-23 21:20:07 3 举报
flume和canal原理及源码分析
作者其他创作
大纲/内容
将未take掉的event重新放回queue中queue.addHead(takeList.removeLast())
N
请求
启动所有相关核心组件canalInstance.start()
启动controller.start()
初始化instance configinitInstanceConfig()
ServerRunningMonitorrunning节点控制器,实现server HA
source
metaManager.start()
唤醒等待get数据的线程notEmpty.signal();
channel
我们可以从Nginx,本地文件或上一个flume等等数据源来采集数据
发送到sink阶段flushCallback.flush(transaction)
Selectorchannel选择器#getRequiredChannels#getOptionalChannels
从channel中获取eventchannel.take();
RingBuffer<MessageEvent>内存消息回环管理组件
端口
提交ack
subscribe1、启动MetaManager2、执行一下meta订阅3、获取client消费位置
cursor变化同步到zk
规则rules
以轮训方式发送到各sink上
eventStore.start()
kafka
发送BINLOG_DUMP命令
1、连接(1) 通过zk的watch监听server的状态(2) 进行failover切换
ChannelProcessorEvent处理器#processEventBatch
Avro
Connector启动监听 HTTP端口
InstanceConfigMonitor监听instance file的文件变化,触发instance start/stop等操作
移动ack指针到get指针ack
开启并行解析模式?parallel
若处理失败就找priority小的
MysqlMultiStageCoprocessor.publish()将log发送到内存缓存队列
开始dump数据erosaConnection.dump()
添加HEAD
开启backoff = true后将sink发送到黑名单
put放到CanalEntry数组中
FlumeHTTPServlet处理
GET
FileBackedTransaction#doTake
avro
提交offsetKafkaTransaction#doCommit
解析LogEventLogDecoder.decode对二进制进行解析
初始化服务端运行状态ServerRunningData
3、启动netty servercanalServer.start()
ZK
内嵌jetty
SUBSCRIPTION
HTTP Source
关闭时回调embededCanalServer.stop(destination)
开始store阶段eventStore.tryPut()
City
Log#rollback追加写入一个Rollback对象,表示回滚之前的event操作
初始化Event[]环形内存队列
sink()sinkData()
Elasticsearch
MysqlEventParser#start
BasicTransactionSemantics#put
Sink
netty handlerSessionHandler处理客户端订阅事件
main类com.alibaba.otter.canal.deployer.CanalLauncher
canal instance
写入成功?
数据源
Sink groupsprocessor.type = load_balance
ack
spillable memory
binlogParser.start()构造bin log parser
FileChannel保存未提交指针putList.offer(ptr)
不断dump bin logwhile (fetcher.fetch())
创建事务FileChannel#createTransaction
spooldir
结果
flush打包List<CanalEntry.Entry> transaction
通过SinkFunction进行sink
nio socketHANDSHAKECLIENTAUTHENTICATION
核心事务对象FileBackedTransactionLog:磁盘日志文件组件putList:待写入磁盘文件takeList:channel读取走的event
所有Event
放入takeList中takeList.offer(ptr)
Zookeeper
等待数据源重试
canal netty server
订阅事件如果创建成功就进行path初始化
创建netty serverCanalServerWithNetty.instance()
写入Take事件对象Log#take
channel不删数据等待重新发送
consumerAndRecords.get().commitOffsets();
eventSink.start()
获取channel绑定的事务channel.getTransaction()
遍历所有Channel
回滚
InterceptorChain拦截器处理链#intercept()如:添加HEADER
事务语义,数据不丢失
app
MySQL
multiplexing
Y 响应成功
netty handlerClientAuthenticationHandler处理客户端握手、鉴权
City = SH
加入到transactionBuffer内存环形缓冲队列中这里主要做事务处理,其实就是将事务中的一堆数据打包,发送transactionBuffer.add()
3、获取数据connector.getWithoutAck(batchSize)
Kafka
eventParser.start()
根据channel获取events
canal embedded server
遍历所有events
记录binglog position
GET / ACK / ROLLBACK
canalStater.start()
GTID模式举例
是否发送成功
启动instance配置监听monitor.start()
将event从putList移动到Channel全局queue中queue.addTail(putList.removeFirst())
deployerbin/startup.sh
2、提交事务tx.commit();
1、开启事务tx.begin();
MemoryEventStoreWithBuffer
3、回滚事务tx.rollback();
alarmHandler.start()
回调
replicating
不提交offset清空内存队列等待重新发送
开始sink阶段eventSink.sink()
存在row记录doSink(events)
ROLLBACK
初始化全局参数设置initGlobalConfig()
FlumeEventPointer记录每条记录在磁盘中的位置
原理
initRunning()
processActiveEnter()
黑名单超过 maxpenalty 后再次尝试这里的sink
LogFile.Writer#put通过NIO写入log文件
获取数据,移动get指针doGet
Application
2
memory
SUBSCRIPTIONUNSUBSCRIPTIONGETACKROLLBACK
目标存储
心跳事件剥离HeartBeatEntryEventHandler#before
server运行状态监听器ServerRunningListener
2、订阅
通过事务写入FileChannel.FileBackedTransaction#doPut
Canal client我们的客户端
initCid(path)初始化系统目录创建整个canal的工作节点
切换active失败时的回调
同时将sink发送到黑名单
创建DirectLogFetcher基于socket的logEvent实现
启动工作线程parseThread
Flume原理
interceptor
store存储(目前仅memory)
selector选择channel
源码
创建一个个logBufferLogBuffer buffer = fetcher.duplicate();
HTTP 请求
1、优先启动embeded服务embededCanalServer.start()
Prev Flume
Sink groupsprocessor.type = failover
发送Req
发送消息,让sink可以消费到这条binlogdisruptorMsgBuffer.publish(next);
移动get指针到ack指针rollback
写入
创建
2、启动所有instancefor{ ServerRunningMonitor.start() }
parser拉取
City = BJ
channel删除数据
HBasepriority = 10
获取指定的ChannelgetChannel()
HDFS
事务启动Transaction#begin
4、业务处理
将事务放入当前线程
FileChannel
Agent进程
创建内嵌serverCanalServerWithEmbedded.instance()
发送成功?
KafkaSink
开始执行replication1. 构造ErosaConnection连接2. 启动一个心跳线程3. 执行dump前的准备工作4. 获取最后的位置信息
sink过滤、分发路由、聚合
CanalController canal调度控制器
1 放入
Log日志组件#putprotoBuf序列化event封装为Put对象
JSONHandler#getEvents转换为JsonEvent集合
ACK
优先找priority大的sink
维护channel -> events映射关系span style=\"font-size: inherit;\
Canal client
若处理失败就找其他轮训的sink
执行具体put操作doPut(List)
Channel#getTransaction
注册服务临时节点
AbstractEventParser#consumeTheEventAndProfilingIfNecessary
放入putList
Channel#put
FileChannel.FileBackedTransaction#doCommit
配置定时拉取线程远程配置canal.properties修改重新加载整个应用
Log#commitPut追加写入一个Commit对象表示之前写入的event事务提交完成
Nginx
file
根据event指针从log文件中获取eventlog.get(ptr)
getWithoutAck ()ack()rollback()
http
Y
排他启动
processStart()
当前事务ThreadLocal<BasicTransactionSemantics>
transactionBuffer.start()配置transaction buffer初始化缓冲队列
启动时回调embededCanalServer.start(destination)
所有请求
一条数据分别发给两个channel
根据transactionID获取event指针,并且从channel提交到的全局queue中出队FlumeEventQueue#removeHead
本地文件
Hivepriority = 5
0 条评论
回复 删除
下一页