zookeeper源码之Leader选举
2024-04-10 18:36:24 0 举报
详细zk源码图,从起点开始,到后面总结
作者其他创作
大纲/内容
run
构建消息ToSend notmsg = new ToSend(...)
拿到这里存入的队列
初始化通信方式 要么nio要么netty
虽然还有其他的逻辑,归根到底还是设置本机状态
读取
messenger = new Messenger(manager);
继续循环
接收连接者发送过来的消息RecvWorker
Timer调度器
break;
从queueSendMap拿到队列,然后通过socket发送队列里的消息出去
判断消息是否为空m == null
while (running && !shutdown && sock != null)
关闭socket连接closeSocket(sock);
初始化一系列节点信息完成
//启动集群选举Leader线程super.start();
QuorumPeer为一个线程
是
如果本服务器不是looking选举状态,但是远端服务器是looking选举状态,则将本机器认为的leader发给远端服务器,leader已经有了,我直接告诉你就行了,你不用再选了if(ackstate == QuorumPeer.ServerState.LOOKING)
与leader建立连接,并跟同步leader数据follower.followLeader();
刚选举的时候都是为0
while (self.isRunning())
需要发送过去的投票数据放入这个队列中,
开始初始化一系列节点信息
关闭跟leader的连接follower.shutdown();
初始一个配置信息对象QuorumPeerConfig config = new QuorumPeerConfig();
leader.lead();
是:远端选举周期小于本机器选举周期
判断远端节点的状态n:recvqueue拿到的消息switch (n.state)
sid == self.getId()
senderWorkerMap
设置节点为LOOKING,等待下一次循环重新选举leaderupdateServerState();
需要跟leader同步消息FOLLOWING
continue
实例化Notification(通知)并设置其属性
这种情况一般是已选出leader的集群,有新机器加入的时候,新机器处于Looking状态,会先给自己投票,其他机器收到后会回发已选出的leader票据过来,才来到这个逻辑。这个票据的发送方可能为FOLLOWING或LEADING
//初始化集群选举Leader相关对象数据 startLeaderElection();
非空
由WorkerSender拿出消息并写入queueSendMap中的队列
添加到消息接收队列recvQueue.add(msg);
s = ss.accept();
拿到sock输入流DataInputStream din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
绑定地址ss.bind(addr);
第一次投票自己
获取到发送票据过来的机器地址electionAddr = init.electionAddr;
ackstate = QuorumPeer.ServerState.LOOKING;
当leader挂了,重新选举leader逻辑
投票:第一次投自己,后面票据pk结果放sendqueue队列读票:从recvqueue队列拿取票据做选举逻辑
当有连接过来,开启两个线程:一个发消息线程、一个接收消息线程。
集群
从recvqueue读取票据
返回true:(远端PK胜利)1-远端选举周期较高 2-选举周期相同,但是远端zxid(事务id)更高 3-选举周期相同,事务id相同,远端sid更大。
往队列添加消息queue.add(buffer);
args有我们传入的配置文件路径
拿到本机的state。switch (getPeerState())
又创建消息,准备发送:添加到发送队列ToSend notmsg = new ToSend()
启动WorkerSenderthis.wsThread.start();
设置服务id标识quorumPeer.setMyid(config.getServerId());
使用NIOServerCnxnFactory
args.length == 1判断的是有没有传入配置文件;config.isDistributed()判断也可以猜到就是通过配置文件中是否配置了集群节点信息
this.wr = new WorkerReceiver(manager);
解析配置文件config.parse(args[0])
校验机器是否在集群节点validVoter(sid、leaderId)
初始化Netty线程组#NettyServerCnxnFactory.configure(...)
同步数据syncWithLeader(newEpochZxid);
集群方式创建选举算法electionType=3 this.electionAlg = createElectionAlgorithm(electionType);
代表同一个方法里
启动服务节点quorumPeer.start();
判断接受消息是否为空response == null
启动选举监听线程
n.electionEpoch logicalclock.get()
3
fle.start();
//处理消息process(m);
run()#RecvWorker
从queueSendMap拿到队列:每台机器对应的那个队列ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
队列统一放入集合queueSendMap
run()#WorkerReceiver
queueSendMap拿出来的队列
废话一句:这里我就不管什么Nio还是其他通信方式了,统一以netty为主
start()
addr=127.0.0.1:3001
在发送消息的时候会给所有机器包括自己发送票据
这里我不看关于权限方式的代码,直接跳过或者选择看无认证方式的代码
//初始化选举数据管理器 QuorumCnxManager qcm = createCnxnManager();
state默认为ServerState.LOOKING;选举状态
1
通过socket发送消息
启动内嵌的jetty服务,默认8080(就算被占用了启动不成功也不影响主线程,因为被try起来了),用来通过http请求查看服务端状态信息adminServer.start();
PK完后告诉所有节点
如果之前没有队列就放入新建的队列,有的话就使用之前的
这种情况就是配置不对了,不可能发生,因为前面就会报错。直接打印日志,不作任何处理
当有新节点加入已存在leader的集群
接受连接跟处理连接receiveConnection(client);
FOLLOWING、LEADING
//半数判断逻辑 return voteSet.hasAllQuorums();
run()#WorkerSender
从sendqueue拿取消息并放入到queueSendMap里对应机器的队列中
不存在
如果本服务器不是选举状态了,说明Leader已经选举出来了,同时说明这个远端的服务器是后面加入的,那么我直接告诉它leader是谁就行了
消息需要发送到哪去都是用自己的sid跟目标sid标识,后续就知道谁发送到谁那里去
recvqueue = new LinkedBlockingQueue<Notification>();
获取本机的票据,这个票据一般就是leaderVote current = self.getCurrentVote();
while (running)
发送消息send(b);
传输层:
vsw!=null
添加消息到sendqueuesendqueue.offer(notmsg);
这里个WorkerSender、WorkerReceiver。还有个SendWorker跟RecvWorker。无语...
如果要用netty通信需加上启动参数:-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
地址如果不为空,因为不允许小的连大的,那我是不是可以反过来,比如1不能连2,但是我可以使用2去连1啊。所以这里就是发起2连1。都是为了保证机器间都有一条socket通道且只能有一条
一开始投自己
选举周期原子加1logicalclock.incrementAndGet();
判断是集群方式启动还是单机启动args.length == 1 && config.isDistributed()
设置接受者到发送者属性中sw.setRecv(rw);
//n.electionEpoch:远端选举周期 logicalclock.get():本机器选举周期 //如果远端选举周期小于本机选举周期,那么把自己的pk结果发送给远端if( (n.electionEpoch < logicalclock.get()))
由WorkerReceiver从recvQueue拿取消息放到sendqueue中
设置zk数据库和初始化数据 像/zookeeper节点就是在创建的quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
加载zk数据库loadDataBase();
通过resate确定当前为什么状态:LOOKING、FOLLOWING、LEADING、OBSERVINGswitch (rstate)
开启两个线程
集群方式启动runFromConfig(config);
初始化:recvQueue、queueSendMap、senderWorkerMap、listener、lastMessageSent以及保存本节点信息
初始化数据zk.loadData();
开始新一轮的Leader选举lookForLeader()#FastLeaderElection
messenger不是选举管理器,manager才是this.messenger.start()
发送消息给连接者WorkerSender
看是要commit还是roball数据等,在一致性ZAB那里分析switch(qp.getType())
阻塞
解析消息
case 3
makeLEStrategy():返回之前保存的FastLeaderElection快速选举对象
检查启动属性是否存在zookeeper.serverCnxnFactory,这是一个通信方式,可以传入netty
阻塞获取信息client = ss.accept();
放入
注册JMX(Java管理扩展) 不用管 ManagedUtil.registerLog4jMBeans()
判断节点角色是否为LOOKING,一个节点默认就是LOOKINGgetPeerState() == ServerState.LOOKING
这种情况可能是自己后启动集群选举或者网络中断恢复加入集群,其他机器已经选举好几轮了,所以需要更新自己的选举周期到最新
添加消息到sendqueue队列sendqueue.offer(notmsg);
票据PK
否
设置节点角色,与leader建立连接,并跟同步leader数据break
如果远端选举周期<当前机器选举周期,说明远端机器可能刚启动,这种票据直接废弃。下一次循环会更新到最新周期
true远端的票据胜
由WorkerSender将Pk结果发送出去
创建通信方式工厂,如果我们传入了就采用我们传入的,没有就使用Nio的方式。官方推荐使用netty通信ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
接受消息给连接者WorkerReceiver
读票
开始新一轮的领导人选举。每当我们的QuorumPeer将其状态更改为LOOKING时,就会调用此方法,并且将notifications发送给所有其他集群节点
业务层至传输层:
业务层:
监听端口2181
这个没啥好看的,随便跟进去就能看到就是解析每个属性,然后保存在QuorumPeerConfig对象中
server.1=127.0.0.1:2001:3001:participant2001用来节点间通信,3001用来选举Leader使用
真正通过socket发送消息过去DataOutputStream dout dout.writeInt(b.capacity());dout.write(b.array());dout.flush();
WorkerReceiver的实例this.wrThread.start();
获取本服务节点对象QuorumPeer quorumPeer = getQuorumPeer();
给所有节点发送pk票据结果
LOOKING
读取数据readPacket(qp);
票据放入sendqueue队列中
设置这个发送线程为完成,关闭之前的连接和之前创建的SendWorker跟RecvWorker vsw.finish();
没有过半说明选没有选出leader
监听选举用的端口
PK结果票据放入sendqueue队列中
LEADING
setCurrentVote(makeLEStrategy().lookForLeader());
默认为false,是否为认证性校验quorumSaslAuthEnabled
ackstate = QuorumPeer.ServerState.FOLLOWING;
打印提醒消息
如果发送选票的机器id小于当前机器id,则关闭连接。为什么这么做呢?为了防止机器之间重复建立socket连接(socket连接为双向),zk不允许id小的连接id大的机器。因为每台机器在启动的时候都会主动去跟其他集群节点建立socket连接,本身socket就是双向连接,比如A跟B建立起了连接,那么AB能互相发送消息。B在启动的时候也会向A发起连接,但是因为AB的连接A已经建立好了,所以没必要再建立一条通道,浪费,直接关闭。当然你也可以规定小的连大的,都是一样的,只是zk使用只能大的连小的来限制。
此pk结果过半
如果此服务器(自己)为正在选举状态-LOOKING,则发送选举leaderself.getPeerState() == QuorumPeer.ServerState.LOOKING
QuorumPeerMain.main(String[] args)服务端启动主类
将数据从磁盘加载到内存中,并将事务添加到内存中的commitlog中。@返回磁盘上的最后一个有效zxid(zxid:最大事务id)zkDb.loadDataBase();
投票
//获取到leaderQuorumServer leaderServer = findLeader();
更新自己的选举周期logicalclock.set(n.electionEpoch);
初始化zk数据,集群数据,加载之前持久化数据(以防前面没做)等
如果远端选举周期当前机器选举周期n.electionEpoch logicalclock.get()
finally:如果leader挂了进入
读取远端发送过来的消息并放入recvQueue队列
ackstate = QuorumPeer.ServerState.LEADING;
OBSERVING
发送消息给连接者SendWorker
启动Leader ZooKeeper服务器并将zxid初始化为选举周期startZkServer();
是:本机还是选举状态
一台连接过来的机器只有一个SendWorker跟RecvWorker
this.ws = new WorkerSender(manager);
使用传入的方式通信
false本机器票据胜
vsw != null
这是启动跟客户端通信的netty服务端,默认端口为2181
启动清理快照文件任务,之前服务没用的文件需要清理purgeMgr.start();
ackstate = QuorumPeer.ServerState.OBSERVING;
由WorkerReceiver将消息读出并开始p票据PK
n.electionEpoch = logicalclock.get()需要pk
清空之前的选票箱recvset.clear();
循环所有参与者
读取发送票据机器的idprotocolVersion = din.readLong();
单机
从recvQueue拿取票据并放入到recvqueue队列中
等待投票消息
空
入口
同步数据给从节点cnxAcceptor.start();
存在
当前节点为选举状态,会不断的从应用层接收队列(recvqueue)里拿票据做选举while ((self.getPeerState() == ServerState.LOOKING)&& (!stop))
while (!stop)
其他初始化信息...........
while (!shutdown)
0
run()#SendWorker
run()#LearnerCnxAcceptor
由SendWorker拿出再socket发送出去
创建票据:第一次选举失败,再次选举,重新构建票据Vote v = getVote();
return ((newEpoch > curEpoch) ||((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
放入集合
处理消息里的config版本,config可以从其他节点发送过来
单机环境启动ZooKeeperServerMain.main(args);
一开始是取不到的SendWorker vsw = senderWorkerMap.get(sid);
继续下一次循环
发送pk结果票据sendNotifications();
QuorumPeerMain main = new QuorumPeerMain();
启动netty服务:通信服务startServerCnxnFactory();
继续选举
代表说明
也就是添加票据到应用层的sendqueue发送队列,后续线程真正socket发送
循环结束:关闭连接this.finish();
监听选举使用端口3001
2
开启接受者线程rw.start();
半数判断ackSet.size() > half
给所有参与者发送票据(包括自己)sendNotifications();
启动投票线程
每台机器创建一个队列
sid self.getId()
开启发送者线程sw.start();
while (!stop)
FastLeaderElection快速选举开启两线程
判断消息是否发送给自己this.mySid == sid
当监听被触发,有投票消息或者有连接过来
里面就是设置一个运行状态为false,以及关闭连接操作。同时也会把接受线程也设置完成
为什么要主动反向连接,我等到那个大的机器启动的时候去连接这条通道不好吗?两种都是一样的,只是说我就不需要等,主动发起反向连接
使用BIO方式ss = new ServerSocket();
run()#QuorumCnxManager.Listener
启动选举监听 listener为一个线程listener.start();
这里不是线程哦别搞错了cnxnFactory.start()#NettyServerCnxnFactory
开启监听选举的端口
sendqueue = new LinkedBlockingQueue<ToSend>();
这里有把选举管理器(里面包含SendWorker跟RecvWorker保存的senderWorkerMap、queueSendMap)设置到快算选举算法对象里去
初始化以及运行main.initializeAndRun(args);
继续循环,接受连接。就算是同一个连接,后面因为已经保存了连接信息不,也就是
运行集群选举Leader线程run()#QuorumPeer
从DataInputStream流读取远端数据int length = din.readInt();
否:说明本机已经选举出了leader
代表方法里面
for循环将票据箱中的票据跟本机选出的leader比较,如果相等,把sid加入voteSet中if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); }
往recvqueue队列插入消息Notificationrecvqueue.offer(n);
0 条评论
回复 删除
下一页