zookeeper选举源码分析
2022-09-20 14:41:33 0 举报
zookeeper选举源码分析
作者其他创作
大纲/内容
创建收发选票的队列
manager.connectAll();
将自己的myid、选举端口发给其他选举节点
将数据同步给observer节点
n==0?
setCurrentVote(makeLEStrategy().lookForLeader());
连接上myid=1
super.start();
开启一个线程等待连接
由之前的选举算法创建可知,这里采用的是FastLeaderElection
receiveConnection(client);
选举类型默认是3走case 3分支
QuorumPeer是Thread子类
this.electionAlg = createElectionAlgorithm(electionType);
inform(p);
否收到其他节点的选票
第一次选举周期是相等的
sid就是节点myid
RecvWorker#run()
创建两个线程
处理收到的数据
cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start();
开始
收到过半的ACK则发送commit给其follwer节点
this.ws = new WorkerSender(manager);this.wr = new WorkerReceiver(manager);
一开始启动一个节点,是收不到选票的,此时n==0
s = ss.accept();
将从channel收到的选票放入recvqueue中
第一次myid=2会胜出,投myid=2节点一票
通过之后选出leader
把数据写到内存结构中去这样客户端就能读到数据了
QuorumPeer#run()
从queueSendMap中取BlockingQueue的数据发送
zk.commitProcessor.commit(p.request);
开启一个线程:listener.start();QuorumCnxManager.Listener#run()
loadDataBase();
din = new DataInputStream( new BufferedInputStream(sock.getInputStream()));
// K1 获胜的选票有如下特征:// 1、选举周期是最新的 newEpoch > curEpoch// 2、或者 2.1 周期相等,事务id最大获胜// 3、周期和事务id都相等下,myid最大的获胜
从queueSendMap取出所有参与选举的节点建立连接
sendqueue有选票了
这里实例了sendqueue
process(m)
leader走LEADING
将投票放入发送队列中
会判断还是不是选举状态
bufferedOutput.flush();
发送给远端
发给远端地址
每次收到客户端连接,都会创建这两个线程
myid=2 连接 myid=1 是ok的
数据到来
while(true)
purgeMgr.start();
等待其他节点的连接事件
发送数据
super.startup();
fle.start();
将自己的选票赋值给当前选举算法
if (!p.hasAllQuorums()) { return false; }
创建两个线程1、发送socket线程2、收取socket线程
一般是false,会走第二个分支
这里有个判断只能(myid大的连小的),否则反向连接nio是全双工的,只需要一个通道
创建两个收发选票的队列
org.apache.zookeeper.server.quorum.QuorumPeerMain.main
ss = new ServerSocket();addr = self.getElectionAddress();ss.bind(addr);
Boolean.getBoolean(\"readonlymode.enabled\")
例如:quorumPeer.setElectionType(config.getElectionAlg()); // 选举类型 ,默认3quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));// 内存数据结构
收到选举channel发过来的数据
PK选票
收集选票,判断过半原则
根据配置来判断是否开启集群启动
recvQueue.add(msg);
self.getPeerState() == QuorumPeer.ServerState.LOOKING
处理ACK
setupRequestProcessors();
p = queuedPackets.poll();
开启一个线程
服务器端启动过程
runFromConfig(config);
解析配置文件到配置类
quorumPeer.start();
QuorumConnectionReqThread#run
是实例化选举BIO客户端与远程选举节点建立连接
启动发送和接收线程
sid < self.getId()
sendNotifications();第一次投票给自己
收选票线程WorkerReceiver#run()
构造ToSend实例放入sendqueue
lookForLeader()里面while ((self.getPeerState() == ServerState.LOOKING) &&(!stop))
如果选票是发给自己的
否已经选举出leader
main.initializeAndRun(args)
leader.lead();
实例化一个QuorumConnectionReqThread线程
queuedPackets.add(p);
sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager);// 创建两个线程
startSendingPackets();
是:处于选举状态把选票放入recvqueue
选举算法FastLeaderElection创建完毕开始正式选举
发送自己PK出来的选票sendqueue.offer(notmsg);
leader选举
case Leader.ACK:
选举周期判断
主逻辑在while(true)
收到的数据格式在:QuorumCnxManager#startConnection体现
connectOne(sid);
开启一个定时任务清理快照和日志
ZooKeeperServerMain.main(args);
启动收发选票线程
实例化选票1、自己的myid2、最大的事务id3、选举周期
FastLeaderElection#lookForLeader
建立leader必要的环境构建责任链模式
创建选举算法
将发送线程放入map<myid,SendWorker>如果map中之前有,则移除。确保每次收到都会新建一个SendWorker
建立节点直接传输数据的IO
SendWorker#run()
while(!stop)
根据客户端端口反射生产Nio实例
startLeaderElection();
启动netty服务端绑定与客户端交互的端口
创建选举的SocketSocket
notifyAll();
fh.start();
startZkServer();
QuorumPeer#run() --> while(true) 会不断的收取选票
commit(zxid);
创建配置类
选举出leader
case LOOKING:
new QuorumPeerConfig()
一开始peerState是LOOKING
给配置类QuorumPeer填充属性
发选票线程WorkerSender.run()
client = ss.accept();
adminServer.start();
加载日志文件到内存
单机
sw.start();rw.start();
return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
zk.startup();
构建责任链模式
ToSend notmsg = new ToSend(xxx);sendqueue.offer(notmsg);
加载数据
startServerCnxnFactory();
将客户端的数据发送给其他节点
集群启动
setLeader(makeLeader(logFactory));
选举的socket客户端:假设这里又开启一台机器myid=2
第一次的状态都是LOOKING。FastLeaderElection
假设开启集群启动
zk.loadData();
QuorumCnxManager#initiateConnectionAsync
其他节点发过来的数据由RecvWorker线程负责放入 recvQueue中
从选举channel中收数据
将收到的选票放入recvQueue
recvqueue.offer(n);
QuorumPeer:current、eletionAlg
LearnerCnxAcceptor#start()
启动Jetty服务器目的是展示一些系统的状态属性
LearnerCnxAcceptor#run()
这里可以根据系统参数来选择是否选择Netty.ServerCnxnFactory:服务端与客户通信的Nio工厂接口
如果对方处于选举状态并且选举周期小于自己
Proposal p = outstandingProposals.get(zxid);p.addAck(sid);
判断是否收到了过半的ACK
sendPackets();
假设:另一台机器myid=2开启
dout.writeLong(PROTOCOL_VERSION);dout.writeLong(self.getId());String addr = formatInetAddr(self.getElectionAddress());
表明这是一个选举专用的BIO
0 条评论
回复 删除
下一页