zookeeper源码流程
2022-08-25 15:12:03 1 举报
zookeeper leader选举机制 ZAB源码
作者其他创作
大纲/内容
往队列里放消息
ServerSocket ss = new ServerSocket(); ss.bind(self.getQuorumAddress());
this.wsThread.start();
while死循环根据当前节点状态做对应业务处理
监听是一次性的,触发一次在服务端会删除
//获取当前选票,一般leader Vote current = self.getCurrentVote();
ServerCnxnFactory.createFactory()
if (watcher != null) 将watch加入集合dataTree的watch集合,这里的watch就是NettyServerCnxn
逻辑相似
启动快速选举算法相关线程
CommitProcessor
Set<watch> watchs=watchs.get(chientPath)
封装的proposal
fllower
FinalRequestProcessor
create /gui_xie 666
同步数据给从节点
listener.start()
quorumPeer.start()
if (sid == self.getId())
recWorker消息接受线程
选举应用层(机器2)
票数过半
sid<-->SenderWorker
sendThread.getClientCnxnSocket().wakeupCnxn()
绑定端口启动
if (n.electionEpoch == logicalclock.get()) 选举PK拿受到的票和之前我投的票做PK
将命令发给服务端
follower.followLeader()
sid<-->recWorker
选举核心逻辑
wakeup用来唤醒阻塞在select方法上的线程,底层会往管道里写一个字节,写这个字节只是为了触发写事件,写事件触发后就会将待发送队列里的命令数据发给服务端
2.1 写本地数据文件
服务端leader
setupRequestProcessors();
LOOKING选举主线逻辑
是 票数过半
this.messenger.start();
处理连接信息
zk.loadData();
LearnerHandler.run
初始化集群选举leader相关数据
purgeMgr.start()异步线程清理快照文件
第一次启动没有选票,需要和发送选票的机器建立连接
绑定业务处理cnxnChannelHandle
new
this.snapLog.commit()
while循环发送选票
sendPackets()
服务端follower
din.readLong()
this.wrThread.start();
ToBeAppliedRequestProcessor
wait()
workRequest.doWork()
if (n.electionEpoch logicalclock.get()) 接收的选票选举周期大于自己的选举周期,这周情况可能自己后启动加入集群选举,或者网络中断后加入集群,其他机器已经选举过好几轮了,所以需要更新自己的选举周期到最新
commit(zxid)
myid=3
makeLEStrategy().lookForLeader()
recvqueue
myid=2
2.2 返回ack
for循环选票箱里收到的选票与本机leader选票做对比,如果相等,将投票机器的sid加入voteSet
sockKey.isReadable()收到服务端信息
给所有其它参与投票的节点发送选票到应用层发送队列
构建ack的packet发送给leader
sendqueue.offer(notmsg);
唤醒
syncProcessor.processRequest(request)
sendThread.start()
p.watchRegistration.register
worker.execute(scheduledWorkRequest)
rc = zks.processTxn(request)
SendWorker.run()
使用jute序列化
QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp); }
ProposalRequestProcessor
LeaderRequestProcessor
循环给所有的follower发送packet
commitProcessor.start()
QuorumPeer.run()
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
获取数据
FOLLOWING 或LEADING这种一般是已选出leading集群有新机器加入了,新机器处于LOOKING状态会先投票给自己,其他机器接收到选票后会回发已选出的leader选票给新机器,这个选票发送方的状态就是FOLLOWING 或LEADING
txnLog.commit()
同步消息给observer
submitRequest(si)
while死循环接收leader同步的数据
syncProcessor.processRequest(request)
myid=1的机器,投出去(1,0),收到的票(2,0),将收到的票和自己投出去的对比,优先选择zxid大的为leader,zxid大的机器包含的数据是最新的,如果zxid一样,默认选myid大的为leader,推荐(2,0)成为leader
switch(getPeerState())
zks.getZKDatabase().commit()
将接收到的选票放入选票箱
selector.wakeup()
watcher.process(pair.event)
ClientCnxn.start()
this.mySid == sid)
myid=1
获取选票
requestcreate/gui_xie 666
pipeline.addLast(\"serverCnxnFactory\
从队列里获取消息
(nextProcessor->ToBeAppliedRequestProcessor) nextProcessor.processRequest
case OpCode.create
wakeup
syncWithLeader(newEpochZxid);
取出发送选票队列,发送选票
startServerCnxnFactory()
client
getInitLastLoggedZxid()
使用jute序列化的方式从输入流拿数据
PrepRequestProcessor#run()
设置本级状态 self.setPeerState((n.leader== self.getId()) ? ServerState.LEADING: learningState());
启动初始化加载
WorkerReceiver线程
processEvent(event)
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
CommitProcessor#run()
notifyAll()
receiveMessage(buf)
有NIO读写事件发生
消费
sendqueue.offer(notmsg);
等待server返回
QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp);}
4 commit: 写自己的内存数据
true接收到的选票胜
数据转换为选票
zks.getNextZxid()
setPeerState(ServerState.LOOKING);
1.给发送选票sid这台机器创建一个选票发送器,将来用来发送选票2.将选票发送器与对应的sid放到map3.给发送选票sid机器初始化一个发送选票队列放入map4.启动选票发送线程
Object event = waitingEvents.take()
firstProcessor.processRequest(si)
sendBuffer(bb)
n==null
senderWorker消息发送线程
processCommitted()
如果发送选票方是选举状态,并且选举周期小于自己,则把PK的选票回发给发送选票方
WorkerSender线程
bootstrap.bind(localAddress)
返回选出的leader并设置到自己节点的currentVote属性
ackSet.size() > half
只在主节点用AtomicLong自增
Request request = submittedRequests.take()
1.1 发送proposal
replyHdr.getXid() == -1收到服务端数据变动返回事件
true
收到服务端命名执行完毕返回事件
hzxid.incrementAndGet()
接收
PrepRequestProcessor
AckRequestProcessor
closeSocket(sock); connectOne(sid);
SyncRequestProcessor#run()
3.4 回发节点数据变动通知给客户端,触发客户端监听事件3.5 返回客户端命令操作结果
submittedRequests.add(request)
readPacket(qp);
if (n.electionEpoch logicalclock.get()) 这种情况发送选票机器后启动加入集群选举,这种选票需要作废
生成选票
sendqueue
case OpCode.getData
process(m);
packet.wait()
nextPending.set(request)
adminServer.start()
watchers = watchTable.remove(path)
ZAB数据同步源码
startSendingPackets()
request里有一个是否监听的watch属性传到服务端,服务端会根据该属性做对应的监听处理
从队列中获取packet发给follower
recQueue.add(msg)
//更新自己的选举周期 logicalclock.set(n.electionEpoch);//清空之前的选票箱recvset.clear();
同步leader数据
ackSet.size()>half
manager.connectAll();
RecvWorker.run()
WorkerSender.run()
1 放入传输层待发送对接2与对方sid机器建立连接
queuedPackets.add(p)
CommitProcessor线程wait等待
Socket(BIO)
zks.getLeader().propose(request)
将packet放入队列
与leader建立连接并接收leader数据同步
leader请求处理链
new Zookeeper
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); }
调用监听回调方法
3.2 发送消息observer存储消息
false自己胜
写内存数据
具体PK逻辑
follower.followLeader();
启动集群leader选举流程
服务端通过socketChannel接受客户端请求命令
LEADING
主动向leader发起socket连接
commitProcessor.commit(request)
初始化选举数据管理器
n.state发送选票方的状态
while死循环接受leader同步数据
zk.startup()
leader选举源码流程
p = queuedPackets.poll()
WorkerReceive.run()
2、给所有follower发送proposal
启动服务节点
节点变动通知客户端,客户端收到通知会触发监听回调方法调用
bug
if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); }
后续流程和leader类似
case Leader.COMMIT
初始化服务连接对象
将自己状态改为LOOKING进行下一轮while循环选举
唤醒客户端等待
SendAckRequestProcessor
selector.select(waitTimeOut)
loadDataBase()
myid=1的机器,投出去(2,0),收到的票(2,0),同一台机器的票数已经超过集群的半数,选举结束,确定(2,0)机器是leader
leader.lead();
p.notifyAll()
return endVote;
启动内嵌jetty服务
if (sid self.getId())
rsp = new CreateResponse(rc.path)
queuePacket
将request封装到packet中,将packet放入发送队列outgoingQueue中等待发送
processRequest(Request si)
调用下一个请求处理器
updateServerState()
获取本节点服务对象quorumPeer,设置一些参数
只分析electionType 3的情况
createCnxnManager()
如果leader挂了会触发异常执行
queueSendMap发送队列,每台机器对应一个发送队列
初始化选票(自己)
3.将数据存到本机日志文件
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)
nextProcessor.processRequest(request)
follower请求处理链
sendToNextProcessor
唤醒之前commitProcessor线程的wait等待
zk.commitProcessor.commit(p.request)
queuedRequests.poll()
//放入应用层队列recvqueue.offer(n);
recQueue
本节点服务对象quorumPeer
Leader
queueSendMap.get(sid)send(b)
处理取出的选票
receiveConnection(client);
是本机处于选举状态
SyncRequestProcessor
选票PK拿收到的选票和投给自己的选票做PK
接受
如果leader挂了会触发异常执行finally
写内存数据,节点变动触发客户端监听
nextProcessor.processRequest
读取发送选票的机器ID
pRequest2Txn
if(ackstate == QuorumPeer.ServerState.LOOKING)如果发送选票方是选举状态,则把本机认为的leader选票回发给发送方
运行发送选票线程
FOLLOWING
runFromConfig(config)
过半选举leader逻辑
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get()))sendqueue.offer(notmsg);
(w->NettyServerCnxn)w.process(e)
cnxnChannelHandle.channelRead
发送
observer
ss = new ServerSocket();ss.bind(addr); client = ss.accept();
默认端口8000,用来查看服务端状态信息
ClientCnxn.submitRequester
LearnerHandler#run()
eventThread.start()
将回调通知放入队列
注册log4j jmx
sock.write(p.bb)
执行命令create /gui_xie
makeFollower(logFactory)
从应用层发送队列里取选票
myid=3的机器,启动时发现集群里已经选举出来leader了,此时会让自己变为follpwer
cnxn.processMessage((ByteBuf) msg)
WatchedEvent we = new WatchedEvent(event);eventThread.queueEvent( we );
1.2 写本地数据文件1.3 给自己发ack
构建leader请求处理链
否已经选举出了leader
选举周期+1
startConnect(serverAddress);
ManagedUtil.registerLog4jMBeans()
watchs.add(watcher)
queuedRequests.add(request)
zk create
false
生成事务zxid处理客户端命令逻辑都是单线程从队列里拿数据处理,保证事务处理的顺序一致性
FollowerZooKeeperServer#setupRequestProcessors()
sendNotifications();
FollowerRequestProcessor
发送commit给所有的follower
nextProcessor.processRequest(si)
recvQueue.add(msg);
NettyServerCnxnFactory
sockKey.isWritable()往服务端发送消息
fzk.commit(qp.getZxid())
核心启动流程
写日志文件
启动选举监听
从传输层接收队列里取选票
//发送选票 sendNotifications();
使用log4j 中的JMX, 可以方便远程查看日志 和 动态的修改log4j的配置文件
加载文件数据到内存
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))当前节点是选举状态会不断从应用层接收队列里拿选票做选举
QuorumPeerMain.main服务端启动
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); }
等线程唤醒后返回客户端结果以及写内存数据
leader选举多层队列架构
监听follower节点的socket连接
logicalclock.incrementAndGet();
((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
和服务端建立nio通信
if (sid self.getId())开启接收选票线程
createElectionAlgorithm(electionType)
如果本级投的leader和本级sid一样则自己就是leader,否则是follower或observer
sendPacket(pp)
super.start()
1 如果发送选票的机器id小于当前机器则关闭连接,为了防止机器之间互相重复建立socket连接(双向的),zk不允许小的机器id连接大的机器id2.当前机器主动发起socket连接到选票的id较小的机器
客户端
1.发一条等待处理结果的消息让服务端线程在处理完后通知客户端结果
NettyServerCnxnFactory.start()
初始化netty线程组bossGroup workerGroup
ScheduledWorkRequest#run()
连接建立监听读写事件并处理
config.parse(args[0])加载配置文件到内存
队列里取数据
在选举端口监听连接选举使用普通的socket通信(BIO)
s = ss.accept();
选票接收机器是自己,放入自己的接收队列
sendThread.readResponse(incomingBuffer);
注册自己
将watch加入到path对应的watch集合里面去
3.1 leader收到半数以上ack发送commit
启动或leader宕机选举leader流程
运行接收选票线程
p.addAck(sid)
inform(p)
选举应用层(机器1)
outgoingQueue.add(packet)
startLeaderElection()
接收follower数据并开启线程处理
flush(toFlush)
发送选票给自己
waitingEvents.add(pair)
LOOKING选举状态
初始化LeaderZooKeeperServer数据
cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
break
CnxnChannelHandler.channelRead
队列里获取数据
最终将接收到的选票丢入recQueue队列异步处理
leader.lead()
注册自己到leader
case Leader.PROPOSAL
QuorumConnectionReqThread.run()
QuorumServer leaderServer = findLeader();
prepRequestProcessor.start()
pRequest(request)
启动netty服务
processRequest(Request request)
LeaderZooKeeperServer#setupRequestProcessors
voteSet.hasAllQuorums()
startZkServer();
case Leader.ACK
建立连接
leader给自己发ack
(next->FinalRequestProcessor) next.processRequest
返回客户端结果
finishPacket(packet)
0 条评论
下一页