Zookeeper
2022-06-23 11:50:06 11 举报
源码zookeeper 投票
作者其他创作
大纲/内容
zk.loadData()
cnxnFactory = ServerCnxnFactory.createFactory();
新建并开启选举算法
while (running && !shutdown && sock != null)
以Netty为例
创建接收队列工作者
setupRequestProcessors()notifyAll();
getPeerState() == ServerState.LOOKING
this.wsThread.start()
选举投票是否已经过半
处理本节点投票,选票箱里的选票对比自己的选票,相同就将投票机器的sid放到voteset
initializeAndRun
recvQueue.add(msg);
应用层
recvQueuequeueSendMapsenderWorkerMaplastMessageSent....
ToSend notmsg = new ToSend(...)
totalOrderPredicate
接收队列再无选票,设置自己的角色,清除选票箱,打印日志返回最终结果
ackstate == QuorumPeer.ServerState.LOOKING如果对方选票还属于选举状态时将我方认为的leader选票发送给对方
receiveConnection(client)
使用jute序列化从输入流中拿去数据
创建sid自己对应的正在进行发送与接收队列SenderWorker RecvWorker
ackSet.size() > half
获取我方已选举出来的leader选票Vote current = self.getCurrentVote()
sendqueue.offer(notmsg);
setPeerState(ServerState.LOOKING);
process(m)
初始化Netty线程组
setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()))
createCnxnManager
getQuorumPeer()
zk.startup()
QuorumPeerConfig.parse(args[0])
makeLeader
listener.start()
queueSendMap .get(sid)send(b);
LearnerHandler().start()
其他机器的选举周期小于自己,说明刚加入并未进入此同等方法
n == null
sendqueue.offer(notmsg)
sw
createElectionAlgorithm(electionType)
将启动服务查看服务端状态信息(默认8080口)
从发送队列中拉取一个选票
quorumPeer.start()
sendNotifications()
向对应主机建立连接
默认为3选举类型(算法)
logicalclock.set(n.electionEpoch); // 更新我方的选举周期为最新 recvset.clear(); // 清除选票箱
quorumPeer.initialize()
与从节点同步数据
绑定业务处理CnxnChannelHandle
quorumPeer.setCnxnFactory(cnxnFactory)
super.start()
这种情况一般是leader已选举出来后新机器加入集群中时会发送自己的选票给其他服务器,其他的服务器会回复自己认为的leader选票给新加入的服务器逻辑类似
updateServerState();
connectAll()
放入发送队列
开始初始化leader选举
创建发送队列工作者
n.electionEpoch < logicalclock.get()
启动开始leader选举
从接收队列拉取一个选票
同步leader
totalOrderPredicate() // 执行选票逻辑updateProposal() // 更新提议选票sendNotifications() // 发送选票
while死循环
if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); }
din.readLong
BinaryInputArchive#readRecord
清除过期、时间过长的数据备份文件
new ServerSocket()bind(addr)client = accept() --等待
注册 jmx
通过ID解析地址
runFromConfig(config);
ToSend m = font color=\"#e57373\
设置连接对象放入当前服务节点
super.startup();
self.getPeerState() == QuorumPeer.ServerState.LOOKING
异常处理finallyleader挂了触发
监听follower节点的socket连接
WorkerReceiver(manager).run()
处于选举状态
拉取下来的选票是否为空
QuorumPeer.run()
pipeline.addLast(\"servercnxnfactory\
switch (getPeerState())获取当前的节点选举状态
sid self.getId()
switch (n.state)
通过返回的选举策略进行查找leader
根据 连接服务器类型参数创建服务端连接对象,默认NIO
haveDelivered()
启动参数:zookeeper.serverCnxnFactory
每隔一段时间与发送ping给其他服务器
while((span style=\"font-size: inherit;\
findLeader
对方的sid等于我方的sid,可能是bug
提供消息
处理拉取下来的选票
ss = new ServerSocket();ss.bind(self.getQuorumAddress());
将选票放入应用层接收队列recvqueue.offer(n);
加载数据备份文件日志
放入传输层的队列中
setElectionType(config.getElectionAlg())
purgeMgr.start()
false
case LEADING
新版
第一次启动无选票,与发送的选票的机器建立socket连接
检测发送队列是否为空或已发送完成
接收连接
registerWithLeader(Leader.FOLLOWERINFO);
添加到传输层发送队列
LOOKING
BlockingQueue<ByteBuffer>font color=\"#e57373\
case FOLLOWING
makeFollower(logFactory)
startLeaderElection()
for (LearnerHandler f : getLearners()) {f.ping();}
leader挂了会抛出异常
this.wrThread.start()
case LOOKING
startZkServer()
通过反射实例化
sid == self.getId()
新建通知
NettyServerCnxnFactory()
ManagedUtil.registerLog4jMBeans();
adminServer.start()
开启连接对象服务 NIO或Netty(绑定端口)
选举已完成
(ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch logicalclock.get())如果获取的选票处于选举状态且周期小于己方的选举周期发送pk后的选票给发送方
初始化zk数据库
true:将选票添加到接收队列
QuorumPeerMain.main()
true
break
bootstrap.bind(localAddress)
LearnerCnxAcceptor().run()
3.5.8
发送到的服务器id是否为自己的id自己的选举选票
创建消费者,从下面获取消费
rw
获取新建一个服务节点对象
leader.lead()
startServerCnxnFactory()
FastLeaderElection.start()
选举周期等于自己周期,说明正在大家都正在选举
syncWithLeader(newEpochZxid);
等待选举监听
读取建立连接的sid
loadDataBase()
放入应用层发送队列给其他参与投票者节点
创建定时任务清理
recreateSocketAddresses(id)
从currentVote获取leader
加载初始化zk数据
绑定端口
新建ServerSocket
voteSet.hasAllQuorums()
QuorumCnxManager.Listenerrun()
hasAllQuorums()
TCP 实现了用于leader选举的连接管理器
建立连接
如果对方的选举端口的ServerId 小于 我方的ServerId就关闭连接,为了防止两个机器之间建立重复连接(对方连接我方,我方也连接了对方,socket为全双工)
检测投票过半
初始化 创建 authServer authLearner
readPacket(qp);
处理连接
queueSendMap.get(sid);send(b);
设置选举leader方法
n.electionEpoch > logicalclock.get()
newInstance()
this.myid==sid
setCurrentVote(makeLEStrategy().lookForLeader())
follower.shutdown(); setFollower(null); updateServerState();
ByteBuffer requestBuffer = buildMsg(...)
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))从应用层接收队列获取选票做选举直到当前状态为非选举状态
更新自己节点的状态为looking状态并且进入下一轮的选举当中
while (this.isRunning()) { readPacket(qp); processPacket(qp); }
检测我方选举周期与其他节点的选举周期情况此判断说明我方为后加入的
this.messenger.start
同步leader数据
注册自己的信息到leader中
WorkerSender(manager).run()
cnxAcceptor.start()
connectOne(sid)
connectOne(sid);
初始化选票(自己)
开启节点进行服务
加载配置文件
s = ss.accept();
如果还有其他机器加入将再次验证重新选举
选举周期+1
Message response= font color=\"#e57373\
FOLLOWINGLEADING
ReconfigRequest#deserialize
AtomicLonglogicalclock.incrementAndGet()
Notification n = recvqueue.pollspan style=\"font-size: inherit;\
选举状态
false:获取对应zk服务器ID对应的发送队列
follower.followLeader()
0 条评论
下一页