zookeeper的Leader选举源码分析详解
2021-02-25 17:02:33 4 举报
zookeeper的Leader选举源码分析详解
作者其他创作
大纲/内容
createElectionAlgorithm(electionType);
cnxAcceptor.start();
config.parse(args[0]);解析配置加载内存
readPacket(qp);
zk.startup()
leader跟所有follower定时发送ping请求保持长连接
//更新自己的选举周期logicalclock.set(n.electionEpoch);//清空之前的选票箱recvset.clear();
if(n == null)
this.wsThread.start();
不相等
返回选出的leader并设置到自己节点的CurrentVote属性里
ss = new ServerSocket();
2
quorumPeer = getQuorumPeer();
接收follower数据并开启线程处理
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))当前节点选举状态会不断从应用层接收队列里边拿选票做选举
通过定时任务清理过期时间长的快照文件
选举周期相等
监听Follower节点的socket连接
初始化服务端链接对象zk默认使用的NIO官方推荐使用Netty
createCnxnManager();
具体的PK逻辑
阻塞接收
QuorumVerifier.containsQuorum(Set<Long> set);
3
connectOne(sid);
listener.start();
//启动选票接收器线程rw.start();
给所有其他参与投票的节点发送选票到应用层发送选票队列里去
LOOKING选举主线逻辑
self.getCurrentVote()获取本机当前选票,这个选票一般是leader
初始化连接
获取Leader Server
protocolVersion = din.readLong();
JettyAdminServer.start()
是否启动netty服务
while死循环根据当前节点状态做对应的业务处理
RecvWorker.run()
QuorumConnectionReqThread.run()
zk.registerJMX
findLeader();
接收的选票选举周期小于自己,意味着发选票的机器刚刚加入集群选举,发起投他自己的选票,这种选票一般都要废弃掉。 break;
同步数据给从节点
5
从传输层接收队列里取选票
初始化投票对象
选举周期小于自己
发送选票给自己
使用jute序列化从输入流里拿出数据,jute类似protobuf,据官方说会废弃jute
接受的选票选举周期等于自己,意味着大家一直在参与选举,那么在选举PK时需要去拿收到的选票跟之前自己投的选票做PK。
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
startZkServer();
投票机器超过半数
设置默认选举类型,electionAlg = 3;
ZooKeeperServer.startup()
将上面创建的初始服务连接对象放入本服务节点对象
创建
启动集群选举leader线程
QuorumCnxManager.Listener -->run();
startServerCnxnFactory();
接收连接
是
ss = new ServerSocket();ss.bind(self.getQuorumAddress());
for (LearnerHandler f : getLearners()) { f.ping(); }
初始化集群选举Leader相关对象数据
follower.followLeader();
sendqueue.offer(notmsg);
half = votingMembers.size() / 2;
WorkerSender.run()
manager.connectAll();第一次启动的时候肯定是没有选票的,这时会跟需要发送选票的机器建立连接
核心启动流程
while死循环接收leader同步的数据
建立socket连接
4
初始化内存数据库对象
((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
purgeMgr.start();
更新选票发送
6
读取发送选票的机器id
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch logicalclock.get()))如果发送选票方是选举状态并且发送选举周期小于自己,则把自己PK出来的选票回发给发送选票方
while (true)
开启清理快照线程任务
配置使用Netty
self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());
closeSocket(sock);//如果发送选票的机器id小于当前机器的id,则关闭连接,为了防止机器之间相互重复建立socket连接(双向的),zookeeper不允许id小的机器连接id大的机器。
放入应用层接收队列recvqueue.offer(n);
主动向leader发起socket连接
Pipeline
follower.shutdown();setFollower(null);updateServerState();
ServerCnxnFactory.createFactory();
相等
启用选举监听
CnxnChanerHandler.channelRead
否本机不是选举状态,已经选出leader了
if (this.mySid == sid)
Create /test 123
setFollower(makeFollower(logFactory));
this.messenger.start();
main.initializeAndRun(args);初始化配置文件并加载到内存
sendqueue.offer(notmsg)
switch (n.state)判断选票状态这个状态是发送选票的状态
7
for循环选从选票箱拿与本机投的leader选票对比,如果相等,则将投票机器Sid加入voteSet里
是本机还处于选举状态
Leader.LearnerCnxAcceptor.run()
给所有其他参与投票的节点发送选票到应用层发送队列里
如果leader挂了,这里从leader取数据会抛异常
8
接收的选票选举周期大于自己的选票周期,这种情况可能是自己后启动加入集群选举的或者网络中断恢复后加入集群选举的,其他机器已经选举过好几轮了,所以需要更新自己选举周期到最新的。
1
LOOKING选举状态
quorumPeer.setElectionType(config.getElectionAlg());
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY)
voteSet.hasAllQuorums();
while循环从应用层发送队列里取选票
异步连接初始化
startLeaderElection();
p.addLast(new ChannelInitializer<Channel>() {...}
客户端执行命令
与Leader建立连接并接收leader数据同步
sendNotifications();
如果Leader挂了,就会触发异常并执行finally
QuorumPeer.run()
setCurrentVote(makeLEStrategy().lookForLeader())
从应用层拿选举票
绑定端口
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
因为自己选举周期已经落后了,可能是自己刚加入集群选举,所以是拿收到的选票跟投自己选票进行PK
logicalclock.incrementAndGet();
运行接收选票的线程
将自己的状态改为LOOKING,进入下一轮while循环开始选举
if(ackstate == QuorumPeer.ServerState.LOOKING) 如果发送选票方是选举状态则把本机认为的leader选票回发给发送选票的的选票方
leader.lead();
super.start();
初始化并加入到pipeline中
FOLLOWING
处理取出的选票
过半数选举Leader逻辑
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
s = ss.accept();
runFromConfig(config);
fzk.registerJMX
setPeerState(ServerState.LOOKING);
尝试与对方sid机器建立连接
n.electionEpoch > logicalclock.get()
是否设置了安全验证
9
LEADING
return endVote;
将接收到的选票放入选票箱
bootstrap.bind(localAddress).syncUninterruptibly().channel();
启动快速选举算法相关线程
ackSet.size() > half
registerWithLeader(Leader.FOLLOWERINFO);
初始化Netty线程组bossGroup和workerGroup
初始化LeaderZookerServer数据
if (sid self.getId())
quorumPeer.setCnxnFactory(cnxnFactory);
注册自己到Leader
switch (getPeerState())
sendNotifications();发送选票
在发送选票的时候,也建立一个接收选票器,并启动接收线程
创建投票算法,默认值为3
process(m);
TailContext
注册JMX
LearnerHandler.run()
在上一步选出leader之后再看下是否还有新选票加入,如果有还需要再做下一轮选票PK,如果新的选票获胜则需要重新选举
queuedPackets.add(p);
this.wrThread.start();
false自己的选票胜利
从接收选票的队列里拿
protocolVersion>=0
启动服务节点
adminServer.start();
选举周期大于
同步Leader上的数据
QuorumPeerMain.main()服务端启动主类
quorumPeer.start();
放入阻塞队列执行
sid == self.getId()
获取本服务节点对象
否
syncWithLeader(newEpochZxid);
serverCnxnFactoryName!=null
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)
启动内嵌jetty服务,默认8080端口,主要用来查看服务状端状态信息
放入传输层代发送队列
true接收到的选票胜
receiveConnection(client);
在选举端口监听连接选举使用的普通的socket通信(BIO)
异步线程创建连接
if (vote.equals(entry.getValue())){ voteSet.addAck(entry.getKey());}
加载文件数据到内存
构建Leader请求处理链条
while循环发送选票
ss.bind(addr);client = ss.accept(); setSockOpts(client);
FOLLOWINGLEADING这种状态一般是已经选出了Leader的集群有新机器加入了,新的机器处于LOOKING状态会先选主投票给自己,其他机器收到后会回发已经选出的集群Leader选票给新机器,这个选票的发送方状态就是FOLLOWING或者LEADING
对方收到的选票是LEADING状态执行LEADING逻辑
从节点注册JMX
QuorumPacket qp = new QuorumPacket();while (self.isRunning()) { readPacket(qp);}
初始化选举数据管理器
选举周期加1
初始化自己的选票
进行选票PK
recvQueue.add(msg);
HeadContext
NettyServerCnxnFactory.start()
处理连接信息
ChannelInitializer
ManagedUtil.registerLog4jMBeans()
NettyServerCnxnFactory()
逻辑类似
运行发送选票的线程
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
//设置本机状态 self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());
quorumPeer.initConfigInZKDatabase();
最终将接收的选票放入队列中进行异步处理
接收选票信息加入接收队列
loadDataBase();
zk.loadData();
绑定业务处理CnxnChanerHandler
startSessionTracker(); setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll();
setLeader(makeLeader(logFactory));
if (sid self.getId())否则启动工作线程接收数据。
WorkerReceiver.run()
super.startup()
SendWorker.run()
0 条评论
下一页