ZK的ZAB协议源码
2025-03-18 15:54:45 0 举报
Zookeeper的ZAB协议源码流程图
作者其他创作
大纲/内容
实例化WatcherEvent对象,调用它的deserialize方法反序列化接收到的数据
Leader
SendThread的run方法
PrepRequestProcessor
是写事件,向服务端发送请求数据
调用AckRequestProcessor的processRequest方法,将请求转发给下一个处理器
FinalRequestProcessor
将数据包通过之前建立的集群交互通道发送给所有的学习者
调用SocketChannel的write方法向通道发送请求数据
FollowerRequestProcessor
将Request对象保存到committedRequests已提交未写入内存集合中
一阶段提交的数据包和接收响应都是在这里进行处理的
调用registerAndConnect方法注册到多路复用器上
调用startSendingPackets方法异步将queuedPackets队列中的数据包发送给学习者
主节点一阶段提交,写入磁盘
负责一阶段提交
将上面的数据封装为Request对象,调用SyncRequestProcessor的processRequest方法
一阶段提交
调用receiveMessage方法接收消息
进入死循环,接收从节点发送的数据并反序列化为QuorumPacket对象
二阶段内存持久化
遍历所有的从节点,调用LearnerHandler的queuePacket方法,将数据包添加到queuedPackets队列中
调用sendPacket方法发送数据包
唤醒后会调用processCommitted方法,处理已经完成两阶段提交的请求
这里会将主节点自己的的 ACK 也添加到ackset集合中,因为两阶段提交,第二阶段需要半数以上的节点返回 ACK 响应才能进行,假设当前有3个节点组成的服务,主节点将提议发送给其他节点后,如果只能收到一个从节点的 ACK 响应也能执行第二阶段的提交操作,原因就是将自己也作为一个 ACK 响应的节点
调用WatchManager的triggerWatch方法触发监听回调
负责和学习者进行交互
构造提交数据包QuorumPacket对象,类型是Leader.COMMIT
LinkedBlockingQueue<Request> committedRequests
调用CommitProcessor的processRequest方法,将请求转发给下一个处理器
根据请求类型构造对应的请求对象,调用ProposalRequestProcessor的processRequest方法,将请求转发给下一个处理器
LinkedBlockingQueue<Request> queuedRequests
执行queuedRequests.add(request)将请求保存到队列中
调用Follower的followLeader跟随主节点
将具体化得到的观察者集合和事件封装为WatcherSetEventPair对象,添加到waitingEvents队列中
调用ClientCnxn的submitRequest方法提交请求
一阶段提交【重要】
负责主节点的一阶段磁盘持久化
SyncRequestProcessor
如果收到主节点的响应,并且数据包类型是Leader.LEADERINFO,则进行早期同步工作
EventThread
调用SocketChannel的connect方法建立连接
ProposalRequestProcessor
SendAckRequestProcessor
CommitProcessor
二阶段提交
调用findSendablePacket方法,从outgoingQueue队列中获取需要发送的数据包
调用wakeup方法,内部调用了notifyAll方法,唤醒之前等待二阶段提交完成的CommitProcessor线程
ZooKeeperThread和FileTxnSnapLog负责将数据树和会话持久化到磁盘
主节点二阶提交,写入内存
SendThread
实例化WatchedEvent对象,将上面实例化的WatcherEvent传递进去
负责从节点的一阶段磁盘持久化
异步执行ScheduledWorkRequest的run方法
调用ClientCnxnSocketNIO的connect方法建立连接
接收请求
负责从节点的二阶段内存持久化
如果数据包类型是Leader.PROPOSAL
调用DataTree的processTxn方法,写入树形结构的内存中
调用ClientCnxnSocketNIO的doTransport方法
接收请求响应
调用inform方法,将数据同步给观察者
调用Packet的createBB方法序列化数据并创建ByteBuffer
调用FollowerZooKeeperServer的startup方法启动从节点服务
一阶段磁盘持久化
调用ToBeAppliedRequestProcessor的processRequest方法,将请求转发给下一个处理器
调用SendThread的start方法和EventThread的start方法启动两个线程
Follower
调用CommitWorkRequest的doWork方法
SyncRequestProcessor是个线程类,之前手动启动了该线程,会异步执行run方法
连接建立后会调用SendThread的primeConnection方法
数据处理入口
将Request对象封装为CommitWorkRequest对象,调用WorkerService的schedule方法
发送一阶段的Proposal和二阶段的提交信息
将CommitWorkRequest再封装为ScheduledWorkRequest对象,并提交到线程池中它是个线程对象,启动该线程
调用commit方法通知从节点将数据写入内存完成数据同步
调用CommitProcessor的commit方法,执行写入内存的操作
从自身的queuedRequests队列中获取Request请求,调用CommitProcessor的processRequest方法,将请求转发给下一个处理器
服务处理服务端响应的事件,包括监听器方法的回调
只要从节点服务在运行并且没有异常发生就会执行的循环,调用readPacket方法读取主节点发送的数据包
Client
实例化ZooKeeper客户端对象
调用Leader的propose方法准备一阶段提交
将Proposal对象保存到Leader的outstandingProposals未同步集合中
响应请求
调用syncWithLeader方法,根据上一步得到的最近处理的事务请求编号,将历史数据与主节点进行同步
加同步锁执行selector.selectedKeys()获取事件集
CommitProcessor是个线程类,之前手动启动了该线程,会异步执行run方法
从outstandingProposals中获取之前保存的Proposal,调用addAck方法保存从节点的响应信息,即添加到响应当前节点对应的ackset集合中
调用SocketChannel的register方法将SocketChannel注册到Selector多路复用器上,并对SelectionKey.OP_CONNECT事件感兴趣
建立连接后,需要同步主节点最近处理的事务请求编号和Epoch,防止脑裂
CnxnChannelHandler处理程序在和客户端建立的通道激活状态下会调用channelActive方法,这里会创建NettyServerCnxn类,类中包含了LeaderZooKeeperServer和NettyServerCnxnFactory对象,然后将NettyServerCnxn绑定到通道的CONNECTION_ATTRIBUTE属性值中,后面读写请求就可以拿到这个属性值,交由LeaderZooKeeperServer来处理写请求
调用pRequest方法处理请求
调用PrepRequestProcessor的processRequest方法,将请求转发给下一个处理器
将本服务的Epoch设置为主节点返回的值,再构造QuorumPacket数据包,类型是Leader.ACKEPOCH,发送给主节点通知主节点已完成同步
调用processCommitted处理已经完成两阶段提交的数据
调用queuePacket方法创建对应的Packet
sendThread.getClientCnxnSocket().packetAdded()
从自身queuedRequests队列中获取等待被持久化到磁盘中的Request
调用多路复用器Selector的select方法等待事件发生
进入循环,获取LeaderZooKeeperServer对象,调用它的processPacket方法将数据交由主节点处理
异步执行LearnerHandler的run方法
调用CommitProcessor的commit方法,将Request请求添加到committedRequests队列中
将Packet添加到outgoingQueue队列中
调用FollowerZooKeeperServer的setupRequestProcessors方法构造请求处理链
对SelectionKey.OP_READ事件和SelectionKey.OP_WRITE事件感兴趣
将Request对象保存到资深的queuedRequests队列中
调用FinalRequestProcessor的processRequest方法,将请求转发给下一个处理器
构造提交数据包QuorumPacket对象,类型是Leader.INFORM
创建ZooKeeperThread线程任务,调用LeaderZooKeeperServer的takeSnapshot方法将数据树和会话保存到快照文件中
调用create方法创建节点
AckRequestProcessor
调用Leader的processAck方法,接收从节点的 ACK 响应并准备二阶段提交
如果是WatcherSetEventPair事件,则遍历所有的Watcher,回调process方法
调用ZKDatabase的processTxn方法写入内存数据库
ToBeAppliedRequestProcessor
LinkedBlockingQueue<Request> submittedRequests
从queuedRequests队列中获取等待被持久化到磁盘中的Request
是读事件,收到服务端发送的响应数据
FollowerRequestProcessor是个线程类,之前手动启动了该线程,会异步执行run方法
调用Packet的notifyAll方法唤醒等待请求完成的阻塞线程
将消息封装为Request对象,调用submitRequest方法提交请求
调用tryToCommit尝试二阶段提交
执行committedRequests.poll()获取待写入内存的Request
调用SendAckRequestProcessor的processRequest方法,将请求转发给下一个处理器
调用ClientCnxnSocketNIO的packetAdded方法,即Selector的wakeup方法,唤醒阻塞监听通道事件发生的线程
LinkedBlockingQueue<Request> committedRequests已完成两阶段提交但尚未写入内存的请求
超过半数以上的成员返回ACK响应才能执行二阶段提交的逻辑
构造QuorumPacket数据包,类型是Leader.ACK,调用Follower的writePacket方法,将一阶段响应发送给主节点
调用ZxidUtils的getEpochFromZxid方法,将主节点返回的最近处理的事务请求编号解析为Epoch值
AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE = AttributeKey.valueOf(\"NettyServerCnxn\")
进入死循环,执行waitingEvents.take()获取事件
发送从节点的ACK响应
调用sendObserverPacket方法,遍历所有的观察者,调用LearnerHandler的queuePacket方法,将数据包添加到queuedPackets队列中
客户端发送的请求会被CnxnChannelHandler的channelRead方法接收
调用EventThread的queueEvent方法
调用sendToNextProcessor方法发送给下一个处理器
font color=\"#ff0000\
如果没有完成两阶段提交,会调用wait()方法进行等待
执行queuedRequests.add(request)将请求保存到阻塞队列中
调用finishPacket方法,处理读取完成后的逻辑
根据客户端操作的命令执行不同的逻辑,如果是OpCode.create命令,则会调用createNode方法
调用processEvent方法处理事件
调用ClientCnxn的start方法
负责将客户端的请求封装为对应的请求处理类,并发送到责任链中进行处理
调用LeaderRequestProcessor的processRequest方法责任链模式处理消息
构造TxnHeader对象,调用SerializeUtils的deserializeTxn反序列化数据得到Record对象,调用FollowerZooKeeperServer的logRequest方法执行一阶段提交的逻辑
LinkedBlockingDeque<Packet> outgoingQueue
负责和服务端连接连接并和服务端进行交互
配置Watcher监听器并调用ClientCnxnSocketNIO的connectionPrimed方法
调用SyncRequestProcessor的processRequest方法,异步将请求数据同步到快照日志和事务文日志文件中
发送请求
Observer
遍历所有的事件,调用doIO方法处理事件
除了主节点,从节点和观察者节点都是学习者
调用NettyServerCnxn的processMessage方法处理客户端消息
调用createSock方法创建SocketChannel对象
调用connectToLeader方法和主节点建立Socket连接
调用SendThread的readResponse方法读取响应
PrepRequestProcessor是个线程类,之前手动启动了该线程,会异步执行run方法
LinkedBlockingQueue<Object> waitingEvents
调用processPacket方法处理数据包
二阶段提交【重要】
调用LeaderZooKeeperServer的processTxn方法处理事务请求
如果数据包类型是Leader.COMMIT
真正的负责处理从节点的二阶段内存持久化
LinkedBlockingQueue<Request> queuedRequests
执行submittedRequests.add(request)将客户单请求保存到阻塞队列中
调用registerWithLeader方法构造QuorumPacket数据包,类型是Leader.FOLLOWERINFO,通知主节点自己的身份
收到客户端的请求后:1、向所有从节点发送Proposal信息,进行一阶段提交2、将数据写入本地数据文件,快照文件和日志文件等3、将自己作为ACK响应保存到集合中4、阻塞等待从节点的ACK响应,如果超过半数以上的节点进行了响应,则进行二阶段提交5、向所有从节点发送Commit信息,通知从节点将之前的数据变更持久化到内存数据库中6、向所有观察者发送变更的数据进行数据同步7、将变更的数据持久化到自己的内存数据库中8、响应客户端请求
有些逻辑没看,目前来看主要是负责转发请求
监听是一次性的,触发完成就会从服务端删除
调用FinalRequestProcessor的processRequest方法
负责主节点的二阶段内存持久化
调用Packet的wait方法,阻塞线程直到请求完成
调用startConnect开始和服务端建立连接
调用ZKWatchManager的materialize方法,根据事件具体化观察者
EventThread的run方法
调用FollowerZooKeeperServer的commit方法执行提交
判断数据包的类型,如果是Leader.ACK响应数据,则调用Leader的processAck方法,接收从节点的 ACK 响应并准备二阶段提交
真正的负责处理主节点的二阶段内存持久化和响应客户端
调用FollowerZooKeeperServer的processTxn方法处理事务请求
收到主节点的Proposal信息后:1、执行一阶段提交,将数据写入本地数据文件,快照文件和日志文件等2、向主节点发送ACK响应3、阻塞等待主节点的Commit信息,执行二阶段提交, 将变更的数据持久化到自己的内存数据库中
调用Proposal的hasAllQuorums方法,校验收到的ACK响应是否超过半数成员
构造提议数据包QuorumPacket对象,类型是Leader.PROPOSAL,再构造Proposal对象,将QuorumPacket对象赋值给Proposal对象的packet属性
执行submittedRequests.take()阻塞获取客户端的Request请求
调用ServerCnxn的sendResponse方法,通过通道向客户端发送响应数据
从通道的CONNECTION_ATTRIBUTE属性值中获取之前绑定的NettyServerCnxn对象
负责给主节点发送一阶段提交的响应
调用WatchRegistration的register方法注册观察者
LeaderRequestProcessor
HashSet<Long> ackset集群中响应当前节点的服务编号集合
负责将主节点自己作为响应的节点添加到ackset集合中
调用NettyServerCnxn的sendResponse方法响应客户端请求
0 条评论
下一页