zk启动流程
2023-02-05 13:57:20 0 举报
zk
作者其他创作
大纲/内容
for循环选票箱,与自己的选票相同时,sid放入到voteSet
##FastLeaderElectionlogicalclock.incrementAndGet();
启动服务(nio/netty)
设置选举类型默认是3
在接收到选票的时候就给发选票的机器建立一个选票发送器线程,用于将来发送选票,启动发送线程
LOOKING
pk逻辑
添加到sid机器的发送队列中
如果发送选票的机器是选举状态
sendNotifications();
当前机器是选举状态时,不断循环从应用层接收队列取数据
ServerCnxnFactory cnxnFactory=ServerCnxnFactory.createFactory();cnxnFactory.configure();
启动快速选举算法相关线程
初始化集群选举 leader相关对象数据
//设置leaderself.setPeerState()//清空应用层接收队列recvqueue.clear();
过半数选举逻辑
##加载zoo.cfg文件到内存QuorumPeerConfig config = new QuorumPeerConfig();config.parse(args[0]);
选举周期+1
获取本地服务节点对象
注册jmx
getQuorumPeer();
recvQueue.add(msg);
初始化内存数据库对象
readPacket(qp);
ackstate == QuorumPeer.ServerState.LOOKING)
保存pk后的选票
启动应用层接收选票线程
self.getPeerState() == QuorumPeer.ServerState.LOOKING
voteSet.hasAllQuorums();
初始化zk数据
startLeaderElection();
this.wsThread.start();
启动服务节点
while循环发送选票
true,已过半
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))
初始化服务端连接对象默认nio使用netty可配置变量zookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
sid < self.getId()
super.start();
是
n.state
Vote v= makeLEStrategy().lookForLeader()
sid == self.getId()
while((n = recvqueue.poll(200毫秒)) != null)if(totalOrderPredicate())recvqueue.put(n);break;
while循环
重新选举
main.initializeAndRun(args)
(ackSet.size() > half);
syncWithLeader(newEpochZxid);
n==null,没有新选票
bug
wilte循环判断是否有新选票,有新选票需要重新pk,新选票赢了,重新选举
for (LearnerHandler f : getLearners())f.ping();
zk只允许id大的机器连接id小的机器,避免出现双向连接
quorumPeer.setElectionType(config.getElectionAlg());
否,本机以选出leader
totalOrderPredicate
##zk启动类QuorumPeerMain.mian
sendqueue.offer(notmsg);
发送选票的机器id
开始接收选票
protocolVersion = din.readLong();
cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start();
##QuorumCnxManager.listenerlistener.start();
recvqueue.offer(n);
registerWithLeader(Leader.FOLLOWERINFO)
zk.startup();
接收选票的选举周期小于本机器的选举周期n.electionEpoch < logicalclock.get()
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState());
s = ss.accept();
建立连接
注册自己到leader
//更新本机器的选举周期logicalclock.set(n.electionEpoch);//清空选票箱 recvset.clear();
//启动容器节点清除线程if (containerManager != null)containerManager.start
FollowerZooKeeperServer
//阻塞200毫秒Notification n = recvqueue.poll(200);
((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
启动应用层发送选票线程
接收到的选票放到选票箱
启动数据同步线程
follower.followLeader();
FOLLOWINGLEADING
startZkServer();
判断是否是选举状态
quorumPeer.start();
while循环接收数据
初始化选举管理器
WorkerReceiver.run()
同步数据给从节点
loadDataBase();
解析sid机器发送的数据,封装到message中
this.mySid == sid
接收的选票与本机器的选票pk
setCurrentVote(v);
选票接收机器就是自己,放到自己接收选取的队列
receiveConnection(client);
初始化选票选自己
zk.loadData();
n.electionEpoch = logicalclock.get()
处理选举信息
如果发送选票的机器是选举状态,并且选举周期小于本机器
判断是否过半
setFollower(makeFollower(logFactory))
if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey();}
是否集群启动
sid > self.getId()
process(m);
把自己pk后的选票发给发送选票的机器
connectToLeader();
启动发送线程
选票放在recvQueue队列
true:接收的选票赢false:本机器赢发送pk后的选票
发送选票
初始化选票
ss = new ServerSocket();
true:接收选票赢了,更新自己的选票信息(选举对方),发送选票
放弃选票
没有选票
n == null
SendWorker.run()
closeSocket(sock);connectOne(sid);
quorumPeer.run()
queuedPackets.add(p);
出现异常,跳出循环,执行finally
while循环发起ping
单机
this.wrThread.start()
while循环从传输层接收队列里获取选票
##QuorumCnxManager.listenerlistener.run();
startServerCnxnFactory();
发送选票的机器id小于当前机器,关闭连接,主动连接sid的机器
QuorumPack.etserialize
已选出集群leader
updateServerState()
ManagedUtil.registerLog4jMBeans();
同步leader数据
RecvWorker.run()
ZooKeeperServerMain.main(args);
启动选举bio连接,监听选举端口
BinaryOutputArchive.writeRecord
接收选票的选举周期大于本机器的选举周期n.electionEpoch > logicalclock.get()
集群
//阻塞三秒response = manager.pollRecvQueue();
leader.lead();
##Leaderss = new ServerSocket();
接收连接
queue.add(buffer)
##JettyAdminServeradminServer.start();
tcp的保活时间:tickTime * syncLimit
createCnxnManager()
与leader建立连接,同步数据
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get()))
选票pk接收的选票与自己pk
处理选票
FOLLOWING
启动选举监听线程
WorkerSender.run()
//启动请求处理链线程PrepRequestProcessor.run
加载文件数据到内存
##FastLeaderElection.start()Messenger.start();
有选票判断选票发送机器的状态
//本机的选票,一般是leaderVote current = self.getCurrentVote();
##QuorumCnxManagermanager.connectAll();
与leader建立连接,可重连4次,超过4次,抛出异常
// 启动定时任务,清除快照数据//purgeInterval <= 0 不启动定时任务//snapRetainCount需大于2,不然会抛异常 DatadirCleanupManager.start();
获取leader
LEADING
初始化连接
createElectionAlgorithm(electionType);
this.mySid != sid
LeaderZooKeeperServer
放入应用层的接收选票队列
connectOne(sid);
break
while循环从应用层发送队列里获取选票
LearnerHandler.run
runFromConfig(config);
cnxnFactory.start();
QuorumServer leaderServer = findLeader();
etLeader(makeLeader(logFactory));
getPeerState()
LOOKING选举主线逻辑
0 条评论
下一页