zookeeper选举Leader源码剖析
2021-05-10 15:20:18 0 举报
zookeeper选举Leader源码详细流程图
作者其他创作
大纲/内容
for循=循环选票箱跟本机投的leader选票比较,如果相等,将选票机器ID加入voteSet中
createElectionAlgorithm(electionType)
7.2、 启动Netty/NIO服务 监听
QuorumPeer.loadDataBase();
7.4、初始化集群选举leader相关对象数据
WorkerSender.run()
FastLeaderElection.lookForLeader()
for顺序拿到机器ID
Listener.run()
如果leader挂掉,会触发finally方法
7.3、启动jetty service 存放服务器信息
返回leader对象
adminServer.start()
this.recvQueue.offer(msg)
选票逻辑
将收到的选票信息放到选票箱里
往所有的集群机器发选票
self.getCurrentAndNextConfigVoters()
对方收到的选票是leading状态执行leading逻辑
一般不会发送,是个bug
messenger.start()
取出选票队列
1、注册JMX启动监听
leader.lead();
2、初始化服务端连接对象zk默认用NIO可以手动指用netty
建立socket连接
voteSet.hasAllQuorums()
main.initializeAndRun(args)
startLeaderElection()
在选举端口监听连接使用socket通信(BIO)
绑定端口和地址
设置本机状态
构建消息体
会把follower最新的数据同步给leader
sendqueue.offer(notmsg)
QuorumCnxManager.Listener
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
(proposedLeader == self.getId()) ? ServerState.LEADING : learningState()
makeLEStrategy().lookForLeader()
setLeader(makeLeader(logFactory));
4、设置选举类型为3
放在发送消息队列
5、初始化内存结构
最终放在接收消息队列异步处理
当leader还没选举出来,所有节点都是LookinggetPeerState() == ServerState.LOOKING
LearnerCnxAcceptor.run()
选票接收机器是本人,则把该消息放入自己的接收队列中this.recvQueue.offer(msg)
选票PK拿收到的选票与自己PK
logicalclock.incrementAndGet()
recvset.clear();
选举算法electionType=3
setPeerState(ServerState.LOOKING);
QuorumPacket qp = new QuorumPacket();while (this.isRunning()) { //当leader挂了,读发过来的流会抛异常 readPacket(qp); //出来leader发过来的数据 processPacket(qp); }
LOOKING
acceptConnections();
//拿到leaderself.getCurrentVote()
FOLLOWER
从应用层发送队列里取选票
queueSendMap
QuorumPeerMain.getQuorumPeer()
具体的选票PK
ackstate == QuorumPeer.ServerState.LOOKING发送选票方是选举状态,则回发本机选出来的leader给发送选票方
FOLLOWING LEADING这种状态一般新机器刚加入已经选举出来leader的集群新机器的选举状态为LOOKING,选举自己其他机器接收到消息会回发一个leader选票给新机器这种选票方式是LEADING或FOLLOWER
发送选票队列放在map里
LOOKING选举状态
self.getPeerState() == QuorumPeer.ServerState.LOOKING
purgeMgr.start()
//清空之前的选票箱
加载数据
ToSend notmsg =new ToSend()
运行接收线程
this.wsThread.start()
while死循环同步leader数据
connectOne(sid)
通过socket BIO方式启动个线程来监听选票
处理取出的选票
quorumPeer.start();
sendNotifications();
false
跟本机投的leader选票比较,如果相等,将选票机器ID加入voteSet中
与对方机器连接
zk.loadData();
集群启动核心流程
QuorumConnectionReqThread.run()
是,本机还处在选举状态
注册自己到leader
7.5、核心方法 启动Leader选举线程
读取其他机器发送的消息
//开启线程监听 监听主从节点传输数据
QuorumPeer.startServerCnxnFactory()
更新选票信息
ServerCnxnFactory.createFactory()
发送选票
将自己的状态设置为LOOKING状态进行下一轮的选举
n.state选票状态
for (LearnerHandler f : getLearners()) { f.ping(); }
清理早期的文件目录
6、将2创建的出初始连接对象放在本服务节点对象中
返回选到的leader,并设置自己节点的currentVote属性里并清空接收选票队列消息
serverSocket = createNewServerSocket() client = serverSocket.accept()
WorkerReceiver.run()
否
当前节点是选举状态会不断从应用层接收队列里拿选票做选举while ((self.getPeerState() == ServerState.LOOKING) && (!stop))
设置当前投票信息(投自己)
while循环发送选票
7、启动服务节点
RecvWorker.run()
处理连接信息
LEADING
传输层发送消息队列处理
if (sid self.getId())
while (running)死循环根据当前节点状态处理相应的业务
否 本机不是选举状态 已经选举出来了leader
quorumPeer.run()
(ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch logicalclock.get())发选票方机器是选举状态 且 选举周期小于本机的选举周期则把本机PK出来的选票回发给发送选票方
加载配置文件里的参数
recvqueue.clear()
从传输层接收队列里取出选票
serverSocket = new ServerSocket();serverSocket.bind(address);
初始化选举相关两个队列开启了两个线程,发送线程和接收线程
this.wrThread.start()
//更新选举周期logicalclock.set(n.electionEpoch)
监听follower节点的sokect连接 addresses = self.getQuorumAddress().getAllAddresses();
拿到leader机器
getPeerState()
true 超过半数选出Leader判断是否还有其他选票加入如果有则进行选票PK,如果新选票获胜,则重新进行选举
运行发送线程
config.parse(args[0]);
ListenerHandler.acceptConnections()
sendNotifications();
放入应用层接收队列中 recvqueue.offer(n);
true对方选票胜
//取出放在发送选票队列queueSendMap.get(sid)//发送选票send(b);
this.mySid != sid
(n.electionEpochlogicalclock.get()接收选举的周期小于自己的选举周期意味着刚发起选票的机器刚加入集群,发起投自己的选票这种选票无效
setFollower(makeFollower(logFactory));
半数比较
createCnxnManager()
清空接收选票队列消息
QuorumPeerMain.main服务端启动类
给其他节点发起心跳 周期性发起
registerWithLeader(Leader.FOLLOWERINFO);
//从Map取出队列放在发送消息队列
for循环执行LearnerCnxAcceptorHandler 的run方法
启动个线程
sendqueue.offer(notmsg);
7.1、获取本地日志文件加载到内存结构上
主逻辑
break;
初始化选举数据管理器
false自己胜
process(m)
3、初始化本地节点服务对象
makeLEStrategy() 返回FastLeaderElection对象
SendWorker.run()
实例化Leader对象并开启一个线程启动sokect绑定端口
选举周期+1
ManagedUtil.registerLog4jMBeans()
类似
queue.offer(buffer)// sendqueue.offer(notmsg);
读到的消息放到接收队列里
syncWithLeader(newEpochZxid);
给所有其他参与选票的节点 发送选票到应用层发送队列
给其他参加选票节点发送选票到应用层选票队列
makeLeader(logFactory)
runFromConfig(config)
quorumPeer.setElectionType(config.getElectionAlg());
跳出循环
传输层接收消息队列处理
startZkServer();
只允许MyID大的节点连接小的节点
接收连接
listener.start()
n == null
follower.followLeader();
//拿到leader机器QuorumServer leaderServer = findLeader();
LearnerCnxAcceptorHandler.run()
选票周期一样,同在选举,发起选票PK
super.start()
sid == self.getId()
this.mySid == sid
updateServerState();
sendqueue.offer(notmsg);
cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
与leader建立sokect连接
sid self.getId()启动发送、接收选票线程
是
(newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))
放在Map队列
voteSet.addAck(entry.getKey());return voteSet;
quorumPeer.setCnxnFactory(cnxnFactory);
(n.electionEpoch logicalclock.get()接收选举的周期大于自己的选举周期这种情况一般发生在自己后启动加入集群或者网络中断恢复后加入集群中,其他机器已经选举好几周期了,需要更新自己的选举周期为最新的
receiveConnection(client)
0 条评论
回复 删除
下一页