ZK-3.5.8 集群启动源码分析
2023-08-22 18:27:43 4 举报
ZK3.5.8 服务端集群启动源码分析(全网最清晰)
作者其他创作
大纲/内容
startServerCnxnFactory();启动Netty服务
尝试获取新的选票
WorkerReceiver.run()
purgeMgr.start();清理快照文件任务
1、输入流读取数据
ping =new QuorumPacket
如果过半机制返回真
1、如果leader 挂掉就不会接收到心跳Ping
true
n.electionEpoch == logicalclock.get()一开始,选举周期肯定是相等的
初始化socket
this.messenger = new Messenger(manager);
通过DataInputStream读取Socket连接的数据// 最终将数据放到这个队列recvQueue.add(msg);
logicalclock.set(n.electionEpoch);recvset.clear();更新选举周期,清空本机选票箱
recvQueue.add(msg);选票是当前机器,放入自己的接收队列中
Follower与Leader建立起连接,同步数据
setCurrentVote(makeLEStrategy().lookForLeader());使用上述说的快速选举算法,最终选出leader
QuorumPeerMain.main服务端启动主类
n.electionEpoch logicalclock.get()
构造客户端请求处理责任链
2.2、FOLLOWING
config.parse(args[0]);解析缓存配置文件
case Leader.PROPOSAL:
在leader节点上 找到对应follow的zxid位置,从这个地方开始同步数据
this.wsThread.start();发送选票线程
listener.start();启动一个线程,接收选举
LOOKING选举主线逻辑
从对应的传输层队列拿选票
初始化服务端连接对象,默认用IO,官方推荐用Netty如果要用netty通信加上启动参数:-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
syncWithLeader(newEpochZxid);
ss = new ServerSocket(self.getQuorumAddress().getPort());
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));span style=\"font-size: inherit;\
因为上面一系列步骤还是比较多的,有可能又会有新的选票
n.state发送方的选票状态
cnxAcceptor.start();开启线程
return voteSet.hasAllQuorums();过半选举机制
2、follow节点抛异常跳出循环,进入finally
While循环
进入 follower.followLeader()
WorkerSender.run()
break;选举周期小于本机,表示选票的机器刚加入集群,废弃处理
connectOne(sid);与远端机器建立连接
ss = new ServerSocket();使用普通的socket通信(BIO)
如果没有新的选票
添加到阻塞队列
return (ackSet.size() > half);
receiveConnection(client);接收连接
font color=\"#323232\
通过反射构造实例
sid self.getId()
main.initializeAndRun(配置文件);
quorumPeer.start();启动服务节点
half = votingMembers.size() / 2;
channelActive(...)
leader.lead();
new Leader(...)
FollowerZooKeeperServer,在下面也会构造它的处理责任链
quorumPeer = getQuorumPeer();quorumPeer.setZKDatabase初始化内存数据库
sendqueue.offer(notmsg);选票消息往应用层队列中放
初始化Netty线程组,bossGroup和workerGroup
this.mySid == sid
startLeaderElection();启动leader选举
提交到线程池,执行run方法,和参与选举的机器建立Socket连接
queuePacket(ping);
super.start();开始选举逻辑
finally { follower.shutdown(); setFollower(null); updateServerState(); //改为LOOKING状态}
sendNotifications();然后给所有参与选举的机器,发送胜利的选票
this.wsThread -> WorkerSenderthis.wrThread -> WorkerReceiver
syncProcessor.processRequest(request);
写事务日志
(RecvWorker)sw.start();
ServerCnxnFactory.createFactory();cnxnFactory.configure(客户端通信端口,...
process(m);处理选票
比较自己与远端机器的myid
connectToLeader(font color=\"#f8d5b5\
endVote
zk.startup();
channelHandler = new CnxnChannelHandler();
如果选票胜利的一方是自己,则设置自己是Leader否则判断自己是不是能够参与选举,如果能够则设置自己是Follower
channelRead(...)
proposedLeader = leader;proposedZxid = zxid;proposedEpoch = epoch;
client = ss.accept();等待连接
span style=\"font-size: inherit;\
n.electionEpoch logicalclock.get()当前机器可能因为后启动或者网络原因落后别的机器的选举周期
pipeline.addLast(\"servercnxnfactory\
如果有新的选票,再走一遍选举
2、处理集群通信包
runFromConfig(config);核心启动流程
1、LOOKING
sendqueue.offer(notmsg);放入内存队列中
这里注意: 一个是SendWorker 一个是WorkSender
选举之前是LOOKING状态
2.1、LEADING
startZkServer();
绑定监听Follower节点的数据端口
(QuorumCnxManager)listener.run()
sendNotifications();给所有参与选举的机器,发送选票
NettyServerCnxnFactory()
sendqueue = new LinkedBlockingQueue<ToSend>();recvqueue = new LinkedBlockingQueue<Notification>();
LearnerCnxAcceptor.run
使用jute序列化从输入流里拿数据
n==null
(SendWorker)sw.start();
queuedPackets.add(p);
while (running)
NettyServerCnxnFactory.start()
LeaderZooKeeperServer.setupRequestProcessors(...)
刚启动时没有选票
processPacket(qp);
logicalclock.incrementAndGet();选举周期加1
createElectionAlgorithm(选举类型:3);
拿到选票
QuorumPacket qp = new QuorumPacket();while (this.isRunning()) { readPacket(qp); processPacket(qp);}
zk = new LeaderZooKeeperServer
false
true,收到远端的选票方胜
第一轮选举:最终都只留下一张胜利的选票,所以第一轮肯定不会满足过半机制
while死循环接收 leader同步数据
write(...)
与远端机器建立连接
manager.connectAll();
try { follower.followLeader(); } catch (Exception e) { LOG.warn(\"Unexpected exception\
this.messenger.start();
setupRequestProcessors();
LearnerHandler.start();
bootstrap.bind(localAddress)绑定2181客户端通信端口,启动
addr = self.getElectionAddress();ss.bind(addr);监听选举端口
3、重新选举leader
s = ss.accept()等待连接
QuorumPeer.run()
for (LearnerHandler f : getLearners()) { f.ping(); }while死循环,leader给其它follower节点发送心跳
两台机器建立Socket连接之后是可以双向通信的。下面的判断?比如2向1发起连接,为了防止1又向2发起重复的连接
super.startup();
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);ByteBuffer b = lastMessageSent.get(sid); // 如果队列为空send(b);// 通过DataOutputStream向Socket连接发送数据
setFollower(makeFollower(logFactory));
return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))) ));1. 优先比较选举周期 ,大的优先2. 如果1相等,再比较事务修改ID,大的优先2. 如果1,2都相等,再比较两台机器的myid,大的优先
readPacket(qp);
switch (getPeerState())根据当前节点状态做不同的处理
setLeader(makeLeader(logFactory));
this.wrThread.start();接收选票线程
if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: font color=\"#ed9745\
拉取消息
fle.start(); 启动选举线程
0 条评论
回复 删除
下一页