zookeeper集群启动及Leader选举流程图
2023-09-16 11:55:29 0 举报
呕心沥血制作的流程图,如果有错误麻烦联系1427213369@qq.com,谢谢
作者其他创作
大纲/内容
Y
收到了新选票,再来一轮
查看是否收到了新的选票
N
recvqueue.put(n);
n.state
蓝色线逻辑到达,此处逻辑激活继续
默认false
清除快照任务默认参数protected int snapRetainCount = 3;protected int purgeInterval = 0;
getInitId()
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
getInitLastLoggedZxid()
this.mySid == sid
默认使用NIO使用netty加启动参数-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
quorumPeer.setMyid(config.getServerId());
getAllMembers
put/offer
所有Observr节点
org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader
开始选举
Exception-
ManagedUtil.registerLog4jMBeans()
连接socket
org.apache.zookeeper.server.quorum.QuorumPeerConfig
逻辑暂停
getPeerEpoch
加载数据到内存
org.apache.zookeeper.server.quorum.QuorumPeer#start
此段代码逻辑结束回退至...
quorumPeer.setCnxnFactory(cnxnFactory);
response == null
Boolean.getBoolean(\"readonlymode.enabled\")
将发送的数据放进BlockingQueue中
梦开始的地方
return new QuorumCnxManager(...
参数:D:\\selvterProject\\zookeeper\\conf\\zoo_1.cfg
激活下处逻辑
quorumPeer.setElectionType(config.getElectionAlg());
写入顺序 requestBuffer.putInt(state); requestBuffer.putLong(leader); requestBuffer.putLong(zxid); requestBuffer.putLong(electionEpoch); requestBuffer.putLong(epoch); requestBuffer.putInt(Notification.CURRENTVERSION); requestBuffer.putInt(configData.length); requestBuffer.put(configData);
启动配置文件中添加配置dataDir=D:\\\\tmp\\\\zookeeperserver.1=localhost:2888:3888server.2=localhost:2889:3889需要注意,在dataDir配置后的目录,需要指定当前启动的myid方法是创建一个文件名为“myid\
org.apache.zookeeper.server.quorum.FastLeaderElection#getVoteTracker
将选票箱中和本次选举的leader一致的票放进voteSet中并返回
run()
在发送数据到sendqueue后sendqueue逻辑也被激活
org.apache.zookeeper.server.quorum.QuorumPeer#run
org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ListenerHandler#acceptConnections
启动leader选举监听端口服务端3888
PK选票
启动连接线程
this.electionAlg = createElectionAlgorithm(electionType);
n.electionEpoch > logicalclock.get()
发送选票
将leader的选票存到选票箱
被动连接
初始化选票第一票投自己
默认状态LOOKING
poll/take
this.messenger.start();
getObservingMembers
LOOKING
quorumPeer.initConfigInZKDatabase();
获取选票
开始连接
主动连接
默认是不调用的,需要配置purgeInterval 大于0
voteSet.hasAllQuorums()
getVotingMembers
this.recvQueue.offer(msg);
loadDataBase()
org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ListenerHandler#run
logicalclock.set(n.electionEpoch); recvset.clear();
org.apache.zookeeper.server.quorum.QuorumCnxManager.QuorumConnectionReqThread#run
初始化lisener
快速选举相关算法
org.apache.zookeeper.server.quorum.QuorumPeerMain
判断自己是不是可以投票&判断自己投票的leader是否可以被选举
config.parse(args[0]); purgeMgr.start();
所有可以投票的节点
purgeMgr.start();
org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
选举周期+1
获取最大zxid
offer后,该处逻辑被激活,同步进行
验证leader票数是否超过半数
org.apache.zookeeper.server.quorum.QuorumCnxManager#recvQueue
QuorumCnxManager qcm = createCnxnManager();
初始化内存数据库对象
this.wsThread.start();
所有的节点
(ackSet.size() > half);
判断收到的选票和自己的选票谁做leader
startLeaderElection();
clear
send
org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender#run
1.Epoch谁大谁做leader2.zxid谁大谁做leader3.myid谁大谁做leader
updateProposal()
serverSocket = createNewServerSocket(); client = serverSocket.accept();
this.wr = new WorkerReceiver(manager);
manager.connectAll();
直接放到recvQueue队列
if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); }
reconfigFlagClear();
核心选票逻辑
closeSocket(sock);
quorumPeer = getQuorumPeer();...设置一系列属性
如果我连接的机器id比我大则关闭
只能大连小
getPeerState()
构建消息
解析参数清除快照任务
ServerCnxnFactory.createFactory();
org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
n.electionEpoch < logicalclock.get()
offer后,同时跳到这里
while (running)
sid < self.getMyId()
将选出来的leader信息更新
break;
LinkedBlockingQueue<Notification> recvqueue
org.apache.zookeeper.server.NettyServerCnxnFactory
listener.start();
第一次会收到自己发给自己的选票也许会为空,自己发给自己的选票也可能还在路上
从response中读取选票数据
连接其它机器
setCurrentVote(makeLEStrategy().lookForLeader());
create
org.apache.zookeeper.server.quorum.QuorumPeer#createElectionAlgorithmcase:3
获取自己的myid
接收的选票周期小于自己的选票周期说明投票的机器刚加入集群忽略选票即可
初始化投票参数
QuorumCnxManager.Listener listener = qcm.listener;
qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
给所有可以投票的节点发送本次自己的选票
org.apache.zookeeper.server.quorum.FastLeaderElection#sendNotifications
run();
同步到自己的选举周期清除之前的选票箱
hVZThVyacWCrAcR90IFV7Z1XgzUKuPG9
recvqueue.offer(n);
接收的选票周期大于自己的选票周期说明自己刚加入集群
org.apache.zookeeper.server.quorum.FastLeaderElection#start
listener = new Listener(); listener.setName(\"QuorumPeerListener\");
this.wrThread.start();
判断是不是发送给自己的
将选票放到收票箱
checkSuspended();
this.ws = new WorkerSender(manager);
org.apache.zookeeper.server.quorum.QuorumCnxManager#handleConnection
处理连接
initializeAndRun()
节点状态LOOGING、FOLLOWING、LEADING、OBSERVING
返回
org.apache.zookeeper.server.quorum.QuorumCnxManager#toSend
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
cnxnFactory.start();
初始化服务端连接对象
连接我的机器sid比我小则关闭
org.apache.zookeeper.server.quorum.QuorumCnxManager#queueSendMap
读取D:\\tmp\\zookeeper\\1\\version-2\\\\currentEpoch文件中的值
n==null
和其它机器建立连接
leader选举出来了!
设置当前节点状态
!validVoter(response.sid)
runFromConfig(config);
org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener#run
currentEpoch = epochOfZxid;
选举的leader是observer
绑定2181端口客户端进行通信
receiveConnection(client);
connectOne(sid);
lookForLeader()
return endVote;
具体的选票PK逻辑
sendqueue.offer(notmsg);
logicalclock.incrementAndGet();
启动lead选举
org.apache.zookeeper.server.quorum.QuorumPeer#startLeaderElection
quorumPeer.start();
reconfigFlag=false控制getPeerState()的参数具体见:org.apache.zookeeper.server.quorum.QuorumPeer#updateServerState
offer/put
Y
选票人和选票leader都是可以投票的
前边两个逻辑都不符合,说明选举周期一样选票pk
这部分具体内容参考主动连接部分
return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
清除收到的选票
n == null
LinkedBlockingQueue<ToSend>sendqueue
setupMyId();
validVoter(n.sid) && validVoter(n.leader)
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))
org.apache.zookeeper.server.quorum.FastLeaderElection#recvqueue
process(m);
这段具体看下代码,这个判断只是选出来leader,都会执行updateProcesal()方法的,只是传参是这个判断出来的leader
检查心跳
org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeadercase:LOOKING
注册JMX
org.apache.zookeeper.server.quorum.QuorumPeer.ServerState#LOOKING
检查sid是否可以投票
sendNotifications();
sid > self.getMyId()
org.apache.zookeeper.server.quorum.flexible.QuorumVerifier
startServerCnxnFactory();
org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver#run
leaveInstance(endVote);
org.apache.zookeeper.server.quorum.QuorumCnxManager#connectAll
0 条评论
下一页