zookeeper选举过程源码流程图
2024-03-05 20:20:57 0 举报
文件类型:流程图 文件描述:该文件描述了ZooKeeper的选举过程源码流程图。ZooKeeper是一个开源的分布式协调服务,用于维护和监控分布式系统的一致性。在ZooKeeper集群中,选举过程是一个非常重要的环节,它决定了哪个节点成为领导者。流程图详细展示了选举过程中的各个阶段和操作,包括候选人的确定、选票的收集和统计、领导者的选举等。这个流程图对于理解ZooKeeper的选举过程非常有帮助,可以帮助开发者了解ZooKeeper的内部工作原理。
作者其他创作
大纲/内容
初始化选举对象
super.startup()
makeLEStrategy().lookForLeader()
当前节点主动发起连接sid较小的节点
从接收选票队列中取出选票
cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start()
接收到的选票放到选票箱
在选举端口BIO阻塞监听
LeaderZooKeeperServer.setupRequestProcessors()
recvqueue.offer(n)
updateServerState()
self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState())
设置选举算法默认为3
注册JMX
如果要使用netty通信需要加上参数-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager)初始化两个队列
根据节点状态处理不同逻辑
sendqueue.offer(notmsg)
type=a_.readInt(\"type\"); zxid=a_.readLong(\"zxid\"); data=a_.readBuffer(\"data\")
setLeader(makeLeader(logFactory))
runFromConfig(config)
leader给所有follower发送ping请求保持长连接
同步leader数据
判断校验选票
connectOne(sid)
清理快照文件
false自己胜
startZkServer()
//更新周期logicalclock.set(n.electionEpoch);//清空选票箱recvset.clear();
sendNotifications()
process(m)
初始化内存数据库
选举周期+1
初始化服务端连接对象(监听2181端口)zk默认使用NIO官方推荐使用netty
给所有参与选举的节点发送选票
leader挂了,抛出异常,即连接通道断开
加载数据
启动netty服务
logicalclock.incrementAndGet()
manager.connectAll()第一次启动没有选票,需要跟其他节点建立连接
return endVote
ss = new ServerSocket()ss.bind(self.getQuorumAddress())
接收选票线程
setFollower(makeFollower(logFactory))
在pk完之后再看下是否还有新的选票加入,如果有再进行一次pk
din.readLong()
n==null
leader.lead()
recvQueue.add(msg)
purgeMgr.start()
QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp); }
adminServer.start()
注册自己到leader
startServerCnxnFactory()
读取发送选票节点的sid
已经选出leader
voteSet.hasAllQuorums()
LOOKING
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid)
启动选举监听
同步数据给follower
if (sid self.getId())
quorumPeer.setElectionType(config.getElectionAlg())
启动监听follower
ss = new ServerSocket()ss.bind(addr)client = ss.accept()
zk.loadData()
zk.startup()
处理选票
s = ss.accept()
一般不会发生
设置服务连接对象
true其他机器选票胜更新自己的选票为其他机器的选票
服务启动
quorumPeer.start()
this.wsThread.start()
发送选票线程
((newEpoch > curEpoch) ||((newEpoch == curEpoch)&& ((newZxid > curZxid) || ((newZxid ==curZxid) && (newId > curId)))))
QuorumPeerMain.main服务端启动类
获取选票
while死循环接收leader数据
BinaryInputArchive.readRecord()
更新状态
RecvWorker.run()
SendWorker.run()
取出数据反序列化
LEADING
n.electionEpoch logicalclock.get()
loadDataBase()
config.parse(args[0])
switch (getPeerState())
n.electionEpoch == logicalclock.get()
是
getQuorumPeer()
closeSocket(sock)
LOOKING选举主线逻辑
readPacket(qp)
startLeaderElection()
setPeerState(ServerState.LOOKING)
while循环发送选票
n.electionEpoch logicalclock.get()接收选票的周期大于自己的周期,即有可能是当前机器后面才加入,其他机器已经选举好几轮了
从发送队列中取出选票
if (sid == self.getId())
this.messenger.start()
syncWithLeader(newEpochZxid)
与leader建立连接
for (LearnerHandler f : getLearners()) { f.ping(); }
放入到真正要发送出去的队列中
投票超过半数
break
核心启动流程
否
LearnerCnxAcceptor.run()
选举pk逻辑
给其他参与选票的节点发送选票
receiveConnection(client)
QuorumPeer.run()
初始化选举管理器
sid self.getId())开启接收选票
QuorumConnectionReqThread.run()
初始化选票(自己)
接收连接请求
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)
ManagedUtil.registerLog4jMBeans()
LEADER/FOLLOWER这种情况一般是已选出LEADER,有新节点加入,处于LOOKING状态会先投票给自己,其他节点收到后会放回LEADER的选票给新机器
启动内嵌jetty服务默认端口是8080用来查看节点状态信息
发送选票的节点sid小于当前机器的sid则关闭连接,为了避免重复的socket连接,zk不允许小的sid连接大的sid
执行finally
对方节点已经选出leader
FOLLOWING
this.mySid == sid
main.initializeAndRun
ServerCnxnFactory.createFactory()
加载文件数据到内存
从真正待发送的队列中取出要发送的选票
NettyServerCnxnFactory
createElectionAlgorithm(electionType)
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()))
super.start()
(ackSet.size() > half)
发送选票(选自己)
如果选举的结果和自己的sid一样则是leader,否则是follower或者observer
send(b)
fh.start()
集群已经选举好了leader,如果这时候有其他节点加入
建立连接
放入到自己的接收队列中
this.mySid != sid
quorumPeer.setCnxnFactory(cnxnFactory)
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel()
WorkerSender.run()
获取节点对象
是本机还在选举状态
绑定端口启动
NettyServerCnxnFactory
registerWithLeader(Leader.FOLLOWERINFO)
createCnxnManager()
while ((self.getPeerState() == ServerState.LOOKING) &&(!stop))当前节点是LOOKING状态,会一直接收其他节点的选票
接收选票放入队列中
Vote current = self.getCurrentVote()获取本机状态,一般是leader
QuorumCnxManager.Listener
listener.run()
QuorumServer leaderServer = findLeader()
this.wrThread.start()
解析配置文件加载到内存
listener.start()
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get()))如果发送方选票周期小于当前节点选票周期把自己pk的选票发送回去
WorkerReceiver.run()
follower.followLeader()
fle.start()
获取leader信息
connectOne(sid)
0 条评论
下一页