5、zookeeper中leader选举过程
2021-01-28 14:58:09 0 举报
仅限于zk的leader选举过程,其中包含两段选举
作者其他创作
大纲/内容
开始向其他节点同步数据
Messenger
消息发送队列,用于保存那些待发送的消息,按照SID进行分组
startLeaderElection();
初始线程组
myid
WorkerReceiver
发送心跳机制ping其他节点保持节点之间的活性
解析配置文件到内存中
sendqueue<ToSend>
接收线程,接收来自其他server的消息并处理这些消息,在选举过程没有停止的时候,接收线程就循环处理接收到的消息。处理消息的过程是:在外部消息队列recvQueue中取得一个消息,如果这个消息的发送端是个observer,那么立刻回复,将本server要选举的leader、当前状态、epoch信息发送过去。如果这个消息来自于一个participate,那么按本server的状态讨论,如果本server处于looking状态,那么将该消息放到本地接收队列中,同时判断发送方的状态,如果发送方也是looking状态并且其epoch值比较小,那么回复它一个消息告知本server的选举的leader信息。如果本server不是looking状态,而对方是looking状态的话,就回复一个消息告知本server所知道的leader信息。
super.start();
QuorumServer leaderServer = findLeader();
quorumPeer.setXXXX();
启动内嵌jetty服务
queueSendMap
④
purgeMgr.start();
args.length == 1 && config.isDistributed()
否
发送自己的选票结果
无论什么情况都将自己本地记录的leader修改成选举后的结果
sendqueue
启动Netty服务
启动等待新follower的线程
绑定业务处理CnxnServerChannelHandler
// 启动Leader线程ZooKeeperServer.setupRequestProcessors
选举监听
此处注意:getInitLastLoggedZxid()其实zxid是从ZKDatabase中的dataTree获取的,也就是未commit到dataTree中的zxid都是不算的
初始化选票(将自己作为初始化选票)
LearnerHandler.run()
同步Leader数据
SendWorker
接收的数据
设置自己的状态如果选举出来的id和自己是一样的话,那么就直接设置自己是leader
⑥
sendqueue.offer(notmsg);发送给应用层,交给其他的节点进行选举
有
config.parse(args[0]);
fle.start();
new SendWorker
初始化Netty
RecvWorker
s = ss.accept();
处理连接信息
加载本地数据
zkServer.sh
⑦
listener.start();
QuorumPeerMain.main服务器端启动类
1、先拿到recvQueue中的数据2、在循环调用senderWorkerMap将数据中的SendWorker发送给在zk上的其他节点
开始向其他节点进行同步数据
Listener.run();
返回选出的leader并设置到自己的属性中
接收消息队列作用:用于存储接收的所有选票
this.messenger = new Messenger(manager);
senderWorkerMap
switch (getPeerState())根据节点判断自身的状态
工作接收器作用:将用来收集其他zk节点的选票
zk_leader_1
2、sid == self.getId()
WorkerSender
makeLeader(logFactory)
作用:每一个与自身节点建立连接并【发送】数据的线程,从queueSendMap中获取需要发送的消息,且发送给对应的节点
没有
绑定端口
sendNotifications();
ServerBootstrap bootstrap = new ServerBootstrap()
发送的数据2
作用:主要用于记录zk服务于比自己小的节点<节点id,SendWorker>,zk只允许自己去连接比自己小的节点
接收选票器作用:每一个与自身节点建立连接并【接收】数据的线程。将数据汇总到recvQueue中去
new WorkerReceiver
FastLeaderElection
进行连接
ArrayBlockingQueue
createCnxnManager();
一般不会发生,发生的话也是进行警告,并不做什么事情
选票发送队列,用于保存待发送的选票。
startZkServer();
不进行业务处理,进入到下一次循环中去
开启WorkerSender与WorkerReceiver这两个线程,让他们开始发送与接收选票
主要是zk只允许myid大的去建立myid小的连接,不允许myid小的对myid大的进行创建连接
new RecvWorker
QuorumCnxManager
启动内嵌jetty服务,默认端口8080,用于监控整个zk状态
zk.loadData();
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
connectOne(sid);
客户端命令
1、从recvQueue循环获得数据2、进行校验、解析数据、处理业务逻辑等3、判断是否是无权投票的机器3.1、是:就将数据放到发送数据队列,直接返回数据3.2、不是:放到recvqueue中,等待发送数据
发送消息队列
与Leader建立连接
recvqueue
接收的选举周期大于自己的选举周期适用于最后启动成功,再加入集群的机器
QuorumPeerMain.initializeAndRun
开始绑定地址进行与客户端连接ss = new ServerSocket();ss.bind(addr);setSockOpts(client);
sendqueue
while (this.isRunning()) { readPacket(qp); processPacket(qp);}
①
选举周期小于自己这种情况本身不操作
消息接收队列,用于存放那些从其他服务器接收到的消息。
1、创建SendWorker(消息发送者)、RecvWorker(消息接受者)2、给senderWorkerMap(发送工作者组)中放入sid(zk节点配置的myid)、SendWorker(消息发送者)3、给自己投一张leader选票4、启动SendWorker(消息发送者)、RecvWorker(消息接受者)线程
updateProposal()
读取发送选票的ID
选举主逻辑LOOKING状态
②
makeFollower(logFactory)
将选票保存到发送消息队列sendqueue.offer(notmsg);
n.electionEpoch > logicalclock.get()
senderWorkerMap
quorumPeer.start();
totalOrderPredicate
if(n == null)
fh.start();
create /vico 666
queueSendMap
③
负责发送消息到其他server。发送消息的过程是:只要选举没有停止就从本地发送队列sendqueue中取一个消息并将它发送到指定的server。
重新去选举一次保证自己不会遗失选票如果不够半数的话,那就重新拉去,等到半数再进行选举
((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))其中选举为leader有3种情况:1、新的选举周期大于当前选举周期,选择选举周期大的2、选举周期相同时,选择zxid大的3、zxid相同时,选择id大的
创建选举算法
放入自己的选票箱中,用于后面进行计算过半选举
调用main方法进行启动
cnxAcceptor.start();
初始化zk配置
zk.startup();
makeLEStrategy().lookForLeader());
while(running)死循环,用于切换状态
sendNotifications();
recvqueue<Notification>
quorumPeer = getQuorumPeer()
ByteBuffer
logicalclock.incrementAndGet();
follower.followLeader();
CnxnChannelHandler.channelRead()
cnxAcceptor = new LearnerCnxAcceptor();
设置当前节点的状态
LearnerCnxAcceptor.run()
recvQueue
把选票发送给全部的节点
BinaryInputArchive.readRecord()
receiveConnection(client);
FastLeaderElection.lookForLeade()
startServerCnxnFactory();
n.electionEpoch < logicalclock.get()
逻 辑 类 似
ServerCnxnFactory.createFactory()
QuorumCnxManager.run
ZooKeeperServerMain.main(args)
leader.lead();
选票接收队列,用于保存接收到的外部投票。
protocolVersion = din.readLong();
QuorumCnxManager
loadDataBase();
开启死循环用来接收Leader数据
作用:用于接收别的节点发送过来消息的队列
选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。
pipeline.addLast(\"servercnxnfactory\
把自己注册到Leader中去
case LEADING
是
n.state判断选票的状态(是发送方的选票状态)
// 注册JMXManagedUtil.registerLog4jMBeans();
WorkerSender
NettyServerCnxnFactory
syncWithLeader(newEpochZxid);
初始化服务端连接对象,默认使用BIO,官方推荐使用Netty。如需Netty的话,在启动命令上添加参数-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
工作发送器作用:将这些选票按照sid发送给zk其他节点
1、读取发送选票的ID2、从senderWorkerMap中获取一次sid3、关闭这个连接4、自己重新建立与sid的连接
adminServer.start();
new WorkerSender
case FOLLOWING
接收follower数据开始线程进行处理
这种一般是已经选出leader了,然后机器再次启动后,自己投票给自己,其他机器收到后,会发已选出的leader给它,这个发送方的状态就是leader/follower
return endVote;
3、sid > self.getId()
获取本服务节点
1、sid < self.getId()
此处通过启动时是否有参数,判断是否为集群启动
1、将所有的选票过滤一遍,得到接受到的机器发送的最后的选票2、先按照配置的进行分组,再累计每个分组中的权重值3、统计出权重值最大的那个分组4、进行比对数值是否过半
重新统计一次看是否还有新的选票
case LOOKING
当前选票+1
设置属性注入zookeeper服务中去,其中包含较为重要的:1、dataLogDir:数据日志存储地址2、dataDir:数据存储地址3、electionAlg:默认选举方式3,快速选举4、myid:配置的myid5、ZKDatabase:初始化内存对象6、cnxnFactory:将上面配置好的服务对象,放到本节点中去
作用:用于自己给其他zk节点发送消息的队列<节点id,消息队列>,
获取选举的Leader Server
zk_following_2
选票发送器,不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中
开始进行选举
QuorumPeer.run();
this.electionAlg = createElectionAlgorithm(electionType);
如果本身是looking状态就一直循环
for (LearnerHandler f : getLearners()) { f.ping();}
ss = new ServerSocket();
bootstrap.bind(localAddress)
初始化选举数据管理器
启动并清除快照文件
runFromConfig(config);
Messenger
// 读取Leader的命令readPacket(qp);// 解析Leader的命令processPacket(qp);
初始化集群选举leader对象相关配置
sid
尝试与其余节点进行建立连接 manager.connectAll();
选举周期与自己相等直接进行选举然后更新自己的leader再发送选票
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
发送器集合,每个SenderWorker消息发送器,都对应一台远程Zookeeper服务器,负责消息的发送,也按照SID进行分组。
开启选举线程
发送的数据1
super.startup();
WorkerRevicer
⑤
recvQueue
启动服务节点
recvQueue<Message>
发送消息队列作用:存储用于发出去的选票
NettyServerCnxnFactory.start()
主要是创建了:WorkerSender与WorkerReceiver
WorkerReceiver
客户端id与自身id进行比对
Socket
主要是创建了两个消息队列LinkedBlockingQueue:1、sendqueue<ToSend>2、recvqueue<Notification>
QuorumConnectionReqThread.run()
0 条评论
回复 删除
下一页