Zookeeper Leader 选举源码
2022-01-21 10:11:07 0 举报
Zookeeper leader 选举源码流程图、源码分析
作者其他创作
大纲/内容
logicalclock.incrementAndGet();选举周期加1
LearnerHandler fhfh.start();开启线程处理接收的数据
SendWorker.run()
recvQueue.add(msg);
Vote current = self.getCurrentVote();获取本机选举出的leader选票
this.mySid == sid
sendNotifications();sendqueue.offer(notmsg);将选票发送到应用层的发送队列
while (running)switch (getPeerState())当前选举状态
自己胜
QuorumCnxManager.Listener listener = qcm.listener; listener.start();创建socket连接,接收启动时连接端发送的ip myid prot
FastLeaderElectionmakeLEStrategy().lookForLeader()开始执行选举逻辑
leader通信断开后,flower判断leader宕机
run()
LOOKING
manager.connectAll();第一次启动时,n 肯定没有选票,会进行创建连接发送选票
发送到socket
sid self.getId()
receiveConnection(client);接收对方连接创建输入流
FOLLOWINGLEADING
((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));Leader投票逻辑
处理选票
sendNotifications();发送选票
this.mySid != sid
s = ss.accept();接收follower连接数据
while (this.isRunning()) { readPacket(qp); processPacket(qp); }
fle.start();
listener.run()
setLeader(null);updateServerState();更新节点状态为LOOKING重新进行leader选举
LEADING
对方胜
这种情况可能是自己后加入集群选举或者是网络中断加入集群选举。这时其他节点已经选举了几轮,这里直接更新当前的选举中期logicalclock.set(n.electionEpoch);recvset.clear();
setLeader(makeLeader(logFactory));
self.getPeerState() == QuorumPeer.ServerState.LOOKING
n.electionEpoch == logicalclock.get()
建立收发选票socket建立收发选票工作线程建立收发选票工作队列
switch (n.state)判断远端设备选举状态
for(Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements();){ sid = en.nextElement(); connectOne(sid); }
connectOne(sid);
ss = new ServerSocket(); ss.bind(addr); client = ss.accept();创建ServerSocket()阻塞等待对方连接
n.electionEpoch
voteSet.hasAllQuorums();
sendqueue.offer(notmsg);
FOLLOWING
WorkerReceiver.run()
startLeaderElection();初始化集群选举相关数据
同步数据给从节点
startZkServer();
leader.lead();
setFollower(makeFollower(logFactory));
放到RecvQueue
this.messenger.start();this.wsThread.start();this.wrThread.start();
如果远端为选举状态且 远端选举周期小于当前选举周期将选票回发给发送端
Leader 核心选举逻辑
zk.loadData();初始化zk数据
循环与leader通信
follower.followLeader();
(ackSet.size() > half);
RecvWorker.run()
send(b);
return endVote;返回节点状态
如果当前是选举状态
recvqueue.offer(n);将选票放入recvqueue
WorkerSender.run()
termPredicate这种情况般是新机器加入已经选举出leader的集群,逻辑和主主线选举逻辑类似
makeLeader(logFactory)
ss = new ServerSocket(self.getQuorumAddress().getPort());
n.electionEpoch logicalclock.get()
ackstate == QuorumPeer.ServerState.LOOKING如果对方处于选举状态将本机认为的leader 回发给对方
远端建立连接
quorumPeer.start();选举入口
创建ServerSocket
break;新节点加入集群,这种选票直接废弃
cnxAcceptor.start();
for (long sid : self.getCurrentAndNextConfigVoters())
createElectionAlgorithm(electionType);创建选举相关工作线程和队列
super.start();
如果当前不是选举状态,已经选出了leader
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());如果本机的id和选举出的leader id 一样本机设置为leading 否则为following
n==null
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))循环进行leader选举
process(m);
recvQueue.add(msg);如果当前sid和远端发送id一致将选票发送到recvQueue
过半选举机制
这里是与另一台zK leader 建立连接
0 条评论
下一页