ZooKeeper-Leader选举源码(3.4.14)
2022-04-07 12:33:13 14 举报
ZK-Leader选举源码(3.4.14)@@@
作者其他创作
大纲/内容
sendqueue.offer(notmsg);
获取选票
整个zookeeper选举底层可以分为选举应用层和消息传输层,应用层有自己的队列统一接收和发送选票,传输层也设计了自己的队列,但 是按发送的机器分了队列,避免给每台机器发送消息时相互影响,比如某台机器如果出问题发送不成功则不会影响对正常机器的消息发送。
将上面创建的初始服务连接对象放入本服务器节点对象
logicalclock.set(n.electionEpoch);
接受的选票选举周期大于自己的选举周期,这种情况可能是自己后启动加入集群选举,或者是网络中断恢复后加入集群,其他机器已经选举过好几轮流,所以需要更新自己的选举周期到最新
while 循环接收啦leader同步的数据
DataNode.deserialize
设置选举类型,默认是3(electionAlg = 3;)
LOOKING选举状态
WorkerSender线程
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
一般不可能发生
bootstrap.getPipeline().addLast(\"servercnxnfactory\
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
启动发送选票线程
同步Leader数据
runFromConfig(config);
从内存中获取选票中最大的zxid的数据
选票PK因为自己的选票周期落后了,可能是刚加入集群选举,所以是拿收到的选票跟投给自己的选票做PK
获取leaderserver
recvqueue = new LinkedBlockingQueue<Notification>();
解析到配置文件加载到内存
初始化Leader管理器
注册自己到leader
sid3<--->RecvWorker
RecvWorker消息接收线程
while (this.isRunning()) { readPacket(qp); processPacket(qp); }
接收fllower数据并开启显线程处理
closeSocket(sock);
机器1
//取出发送选票的队列ArrayBlockingQueue<ByteBuffer> bq = queueSendMap .get(sid);//发送选票send(b);
启动集群选举leader线程
ss = new ServerSocket();ss.bind(addr);Socket client = ss.accept();
queue.add(buffer);
this.electionAlg = createElectionAlgorithm(electionType);
注册JMX
while重传输层接收队列取出选票
sock.close();
如果发送选票方是选举状态并行发送周期小于自己则把自己PK出來的選票回发给发送选票方送
startLeaderElection();
myid=1的机器,投出去(1,0),收到的票是(2,0),将收到的票跟自己投出去的票对比,优先选择zxid大的为leader,zxdi大的机器包含的数据是最新的,如果zxid一样,默认选myid大的为leader,推荐(2,0)成为leader
最终将接收到的选票放入recvQueue队列中异步处理
绑定业务处理CnxnChannelHandler
机器2
sendNotifications();
返回选出的Leader并设置到自己的节点的currectVote属性里
实现类
queueSendMap发送队列每台机器对应一个发送队列
sid1<--->SenderWorker
QuorumCnxManager.Listener.run()
getInitLastLoggedZxid()
是
给发送选票sid这台机器创建一个选票发送器,在接收到选票的时候就给发选票机器建立一个选票发送器线程供将来发选票使用,并启动发送线程
config.parse(args[0]);
启动选举监听
QuorumServer leaderServer = findLeader();
while循环发送选票
false自己胜
if (sid > this.mySid)
SendWorker sw.start();
FOLLOWING
switch (n.state)
第一轮投票 (2,0)选票胜
BinaryInputArchive.readRecord
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
leader.lead();
quorumPeer.start();
给所有其他参与投票的节点发送选票到应用层发送队列总
发送
构建leader处理请求
while
在上一步选举leader之后再看下是否还有新选票加入,如果有,还需要在做下选举的PK,如果新选票胜则需要重新选举
ManagedUtil.registerLog4jMBeans();
初始化集群选举leader相关对象数据
入队
recvqueue.offer(n);
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
myid=1
myid=3
myid=2的机器,投出去(2,0),收到的票是(1,0),还是推荐(2,0)成为leader
初始化Netty线程组BossGroup和WorkGroup
选举核心逻辑
QuorumPeerMain
NettyServerCnxnFactory
SendWorker.run
如果发送选票方是选举状态则把本机认为的leader选票回发给发送选票方
connectionExecutor.execute( new QuorumConnectionReceiverThread(sock));
初始化服务连接器对象zk默认是NIO,官方推荐Netty
if (sid == this.mySid)
Socket(BIO)
投票机器超过半数
connectOne(sid);
makeFollower(logFactory)
WorkerReceiver线程
将接收的选票放入选票箱
zk.loadData();
recvQueue
LeaderZooKeeperServer.setupRequestProcessors
绑定启动端口
发送选票选自己
同步数据到从节点
第一次启动时肯定是没有选票,这时候会根需要发送选票的机器连接链接建立socket链接
选举应用层(机器1)
sid2<--->RecvWorker
leader根据所有fllower定时发送ping请求保持长连接
myid=1的机器,投出去(2,0),收到的票是(2,0),投一台机器的票数已经超过集群的半数,此时选举就结束了,确定(2,0)机器是leader
cnxnFactory.start();
RecvWorker.run
QuorumPeer.run()
setLeader(makeLeader(logFactory));
if(n == null)
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get()))
while循环接受选票
connectionThreadCnt.incrementAndGet();
if (sid != this.mySid)
启动快速选举算法相关线程
leader选举多层队列架构
t.start();WorkerSender.run
quorumPeer.setCnxnFactory(cnxnFactory);
DataOutputStream dout = null; DataInputStream din = null; dout = new DataOutputStream(sock.getOutputStream()); dout.writeLong(this.mySid); dout.flush(); din = new DataInputStream( new BufferedInputStream(sock.getInputStream()));
启动netty服务
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))
设置内存数据库对象(ZKDatabase)
receiveConnectionAsync(client);
接收的选票周期等于自己,意味着大家一直在参与选举,那么在选举PK的时需要拿收到的选票跟之前自己投的选票做PK
这种状态一般是已经选出leader的集群有新机器加入了,新机器处理LOOKING状态会先选举投票给自己,其他机器收到后会发已选出集群leader选举给新新机器,这个选票的发送方式状态就是FOLLOWING或LEADING
过半数选举leader逻辑
sendqueue
recvQueue.add(msg);
当前节点是选举状态不断从应用层队列里面拿选票做选举
sendqueue = new LinkedBlockingQueue<ToSend>();
接收
main()
处理连接信息
recvqueue
myid=3的机器,启动时发现集群已经选举出来leader了,此时会让自己变成follower
Vote current = self.getCurrentVote();
follower.followLeader();
startZkServer();
选举应用层(机器2)
manager.connectAll();
Main loop根据当前节点的状态做对应的业务处理
quorumPeer.setElectionType(config.getElectionAlg());
清理快照任务
setPeerState(ServerState.LOOKING);
选举周期加1
初始化选票(自己)
QuorumConnectionReqThread.run
获取集群几点对象
在选举端口监听连接选举使用普通的Socket通信(BIO)
客户端执行命令行
读取发送选票机器的sid
给所有其他参与投票的节点发送选票到应用层发送队列
否判断选票的状态,这个状态是发送到选票方的状态
RecvWorkerrw.start();
如果选票接收器接收的是自己放入自己的选票队列
t.start();WorkerReceiver.run
connectOne(sid);
n.electionEpoch > logicalclock.get()
sid3<--->SenderWorker
create /guanbo666
receiveConnection(sock);
第二轮投票 (2,0)选票胜超过半数节点的2号机器成为leader
QuorumMaj.containsQuorum
return self.getQuorumVerifier().containsQuorum(set);
if (sid < this.mySid)
if(ackstate == QuorumPeer.ServerState.LOOKING){
sid = din.readLong();
setupRequestProcessors()
循环选票箱收到的选票,与本机的leader选票对比,如果相等,将投p机器sid加入voteset
myid=2的机器,投出去(2,0),收到的票是(2,0),投一台机器的票数已经超过集群的半数,此时选举就结束了,确定(2,0)机器是leader
生产选票
super.start();
senderWorkerMap消息发送线程
listener.start();
接收连接
当前机器主动发起socket连接到发送选票的id较小的机器
return (set.size() > half);
main.initializeAndRun(args);
NettyServerCnxnFactory.start
process(m);
如果leader挂了 会触触发异常
myid=2
接收的选票周期小于自己,意味着发送选票的机器刚加入集群选举,发起投他自己的选票,这种选票一般都是要废弃
使用jute序列化从输入流里面拿数据,jule类似protobuf.据官方说后面会弃用jute
sid2<--->SenderWorker
数据转为选票
readPacket(qp);
启动服务节点
sendqueue.offer(notmsg);
清空之前的选举箱
逻辑类型
true接收到的选票胜
开启选票接收的线程
if (vote.equals(entry.getValue())){ set.add(entry.getKey()); }
n.electionEpoch < logicalclock.get()
loadDataBase();
如果发送选票的机器id小于当前机器的则关闭连接,为了防止机器之间相互重复建立socket连接(双向的),zk不允许id小的机器连接id大的机器
syncWithLeader(newEpochZxid);
LearnerHandler.run
archive.startRecord(\"node\"); data = archive.readBuffer(\"data\"); acl = archive.readLong(\"acl\
QuorumConnectionReceiverThread.run()
如果要使用Netty通信,需要加上启动参数:-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
setCurrentVote(makeLEStrategy().lookForLeader());
recvset.clear();
实现
核心启动流程
purgeMgr.start();
quorumPeer.initialize();(SaslQuorumAuthServer、SaslQuorumAuthLearner)
与leader建立连接,并接收leader数据同步
recvQueue.add(msg);
发送选票线程
启动接收选票线程
如果本机的leader和本机的sid一样,则自己就是leader,否则就是FOLLOWING或者OBSERVING
初始化服务
setFollower(makeFollower(logFactory));
n.electionEpoch = logicalclock.get()
LOOKING
sid1<--->RecvWorker
启动或leader宕机选举leader流程
主动向leader发起socke连接
while (running)
qcm = createCnxnManager();
是本机还处于选举状态放入应用层接收队列中
switch (getPeerState())
sw.setRecv(rw);
DataInputStream din = new DataInputStream( new BufferedInputStream(sock.getInputStream()));
给发送选票sid机器初始化一个发送选票队列放入Map
LearnerCnxAcceptor.run
FastLeaderElection.lookForLeader
zk.startup();
bootstrap.bind(localAddress);
接收选票线程
quorumPeer = getQuorumPeer();
FOLLOWING andLEADING
Socket s = ss.accept();
LEADING
更新自己的选举周期
放入传输层待放入队列
break;
cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start();
this.messenger = new Messenger(manager);
判断sid类型
logicalclock.incrementAndGet();
将选票发送器与sid对应放在map里面
CnxnChannelHandler.channelread
否本机不是选举状态,已经传出了leader
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)
加载文件数据到内存
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState());
leaveInstance(endVote); return endVote;
0 条评论
下一页