zookeeper领导选举+数据同步(还差一点点),跟着源码走
2021-11-01 08:32:20 0 举报
登录查看完整内容
zookeeper
作者其他创作
大纲/内容
run()
run()启动线程池
QuorumPeer#start
投票完成
}
// 等待leader节点启动完learnerMaster.waitForStartup();
rqv = self.configFromString(new String(b)); QuorumVerifier curQV = self.getQuorumVerifier();
add()
处理命令readPayload();
队列
非looking
this.zks = zks;this.nextProcessor = nextProcessor;
netty
状态为looking
return epoch;
RequestThrottler是一个线程,用来进行限流,firstRequestProcessor.processRequest来处理session的过期时间
......
将当前节点选出的learder节点信息放入sendqueue
while ((self.getPeerState() == ServerState.LOOKING) && (!stop))循环内选举leader选票
ws.start()-run()
if (rqv.getVersion() > curQV.getVersion()) 接收到的version和本地version对比
syncProcessor.processRequest(request);
zks.outstandingChanges.add(c);
省略
EventLoopGroup bossGroup EventLoopGroup workerGroupServerBootstrap bootstrap CnxnChannelHandler channelHandler
requestThrottler.submitRequest(si);
wakeup();
requestThrottler.start();
quorumPeer.setCnxnFactory netty或者NIO
fle.start()
保证按顺序提交操作if(outstandingProposals.containsKey(zxid - 1))
this.messenger = new Messenger(manager);初始化wsThread、wrThread线程
符合过半机制之后,继续从recvqueue中获取选票,如果获取到的选票比较新,则把该选票重新放入recvqueue中,退出当前while
zks.getLeader().propose(request);
queuesToDrain.add(request.sessionId);
false
借道转弯
pRequest(request)
waitingForNewEpoch = false; // 设置leader的acceptedEpoch,写入文件,表示本届领导的epoch self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); 需要等超过一半的机器都统一了epoch就会notifyAll
不存在
totalOrderPredicate 在判断1-新选届时钟2-新选届时钟与当前选届时钟相同,但新zxid更高3-zxid相同,判断myid((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start();
this.wsThread.start();WorkerSender 发送选票线程
// 拿到当前请求对应的sessionId对应的请求队列Deque<Request> sessionQueue = pendingRequests.get(request.sessionId);sessionQueue不为Null :Request topPending = sessionQueue.poll();topPending.setHdr(request.getHdr());topPending.setTxn(request.getTxn());topPending.setTxnDigest(request.getTxnDigest());topPending.zxid = request.zxid;topPending.commitRecvTime = request.commitRecvTime;request = topPending; numWriteQueuedRequests.decrementAndGet(); // 从queuedWriteRequests队列中移除
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
setCurrentVote(makeLEStrategy().lookForLeader());将leader选票赋值给当前节点
Leader.processAck
leader || Following
true
nextProcessor
firstProcessor.
listener.start();
NETTY
选票的来源
ProposalRequestProcessor#rprocessRequest
while (!stopped && requestsToProcess > 0 // 当处理的读请求数量超过了maxReadBatchSize,就暂时不会从queuedRequests中获取请求进行处理了 && (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize) && (request = queuedRequests.poll()) != null) {
waitForEmptyPool();阻塞wait方法
QuorumPeerMain#main#initializeAndRun
处理workRequestworkerPool.schedule(workRequest);
创建节点,代码有点多省略了
makeFollower(logFactory)
.observeLeader();
转弯
连接点
过半验证 if (!p.hasAllQuorums())
监听JVM STWstartJvmPauseMonitor();
执行QuorumPeer#run方法这是个死循环判断当前节点状态
等于
大于
不断的从queuedRequests获取Request进行持久化先尝试把Request对象添加到while (true{
switch (type)
submitRequest(Request si)
wakeup();
process(m)
logicalclock.incrementAndGet()增加时钟周期
判断选票n是否为空
初始化WorkerService 线程池启动SelectorThread,负责接收读写就绪事件for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { }
nextProcessor.processRequest(request)
recvqueue.offer(n);
默认构造方法
两阶段提交中的第三步,向follower节点发送commit请求commit(zxid);
这里加入队列
NIO
过半机制验证 if (voteSet.hasAllQuorums())不过半就break,
sendqueue
启动线程
如果sid是参与者(leader或follower) if (isParticipant(sid)) { connectingFollowers.add(sid); }
run()while (true)
LearnerCnxAcceptorHandler#run()
exp: case OpCode.create2
方法内部会调用 LeaderZookeeperServer的setupRequestProcessors方法
ack阶段
args.length == 1 && config.isDistributed()集群模式
Leader和其他节点完成了数据同步后,就要初始化服务器了,要准备接收客户端请求了这里会启动ReqeustProcessor线程,然后就可以处理客户端请求了
单独开启一个线程去接收learner发过来的socket连接, 只要leader不shutdown,它就不会停止,因为要等待新的learner节点连接
false则break
case Leader.REVALIDATE
leader.lead();
submitRequest(Request request)
zkDb.loadDataBase();根据snapshot文件,初始化database,session,重置zxidlong lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; 从当前dataTree中的最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid值long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid)当前dataTree中的epochcurrentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);currentEpoch文件中的epochepochOfZxid > currentEpoch throw new IOExceptionacceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);acceptedEpoch < currentEpoch throw new IOException
syncWithLeader(newEpochZxid);
makeLeader(logFactory)
接收和发送选票逻辑
循环queuesToDrain获得sessionId对应的sessionQueue在循环sessionQueue,判断请求类型,如果是需要提交的请求
protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>();
switch (qp.getType()){
netty接收IO流channelHandler#channelRead
Leader自己提交zk.commitProcessor.commit(p.request)
public void shutdown() { stop = true; //用于停止本次领导者选举在new FastLeaderElection时初始化为false proposedLeader = -1; proposedZxid = -1; leadingVoteSet = null; LOG.debug(\"Shutting down connection manager\"); manager.halt(); LOG.debug(\"Shutting down messenger\"); messenger.halt(); LOG.debug(\"FLE is down\"); }
.processRequest(request);
startLeaderElection();
thread.start();
我是属于这个类的方法
bootstrap.bind(localAddress)
LeaderZooKeeperServer.startup
请求处理责任链(本来想用泳道的做时序的,二维面画多线程是在有够恶心的)LeaderRequestProcessor --> PrepRequestProcessor -->ProposalRequestProcessord-->CommitProcessor -->Leader.ToBeAppliedRequestProcessor --> FinalRequestProcessor
queuedWriteRequests.add(request);
receiveMessage(buf);
......firstProcessor.processRequest(si);
submitRequestNow(Request si)
当rstate == looking并且n.electionEpoch logicalclock.get()
(NIOServerCnxn)cnxn.doIO(key)
当epoch确定好了后,通知其他节点:我是新leader,本届epoch是... 领导者选举后成为了leader,向其他服务器发送一个NEWLEADER Packet
// 从committedRequests队列中获得一个请求(队头),拿到的这个请求表示两阶段提交已经执行完了,只要本地提交就可以了// Process committed headrequest = committedRequests.peek();
if (!queuedWriteRequests.isEmpty() && queuedWriteRequests.peek().sessionId == request.sessionId && queuedWriteRequests.peek().cxid == request.cxid) {
startZkServer()
} while (!stoppedMainLoop)
getEpochToPropose()
poll()
super.start()
单机启动
self.shuttingDownLE = true这个参数用于循环领导者选举
queuedRequests.add(request);
n 不 为 空判断选票中状态值,如果是OBSERVING不参数投票,直接break
processRequest(Request request)
self.getElectionAlg().shutdown()
判断n.electionEpoch > logicalclock.get()选票的选届时钟于当前节点的比较小于则忽略选票break;
nio
SyncRequestProcessor再去持久化请求日志(本机)
this.wrThread.start();WorkerReceiver 获取选票线程
转个弯
回到节点判断的循环,进行对应节点类型的逻辑
addChangeRecord(ChangeRecord c)
case Leader.PING心跳
startRequestThrottler()requestThrottler = new RequestThrottler(this);
这里死循环从队列中获取
FOLLOWING
用来关闭过期session的一个线程startSessionTracker
while (commitIsWaiting && !stopped && commitsToProcess > 0) {
startup()
socket启动完后,准备读写事件
OBSERVING
connectingFollowers表示参与者,只要超过一半的参与者进行了epoch的协商,就确定了新epoch了if ( connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers) )
if (self.getPeerState() == ServerState.LOOKING)本地还在looking状态
follower.followLeader();
while (true){
addChangeRecord(nodeRecord);
connectOne(long sid)
cnxn.processMessage((ByteBuf) msg);
NETTY#start()
java继承,leader的情况
将当前节点的选票放入sendqueue
阻塞Leader主线程while (true)
验证epoch
doIO(SelectionKey k)
没有空队列
用socket连接其他节点
lookForLeader
case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer:
acceptConnections()
接着走主线代码
while (!stop.get()) { // 接收learner节点发过来的socket连接,每接收到一个socket连接就开启一个LearnerHandler线程 acceptConnections(); }
选票的判断依据,优选判断当前选票权重self.getQuorumVerifier().getWeight(newId) == 0
NIO/NETTY
zk.loadData();
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp);}
startServerCnxnFactory();启动
totalOrderPredicate 在判断1-新选届时钟2-新选届时钟与当前选届时钟相同,但新zxid更高3-zxid相同,判断myid((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
请求处理,数据同步
select()
...............Request request = submittedRequests.take()................
发送选票sendNotifications()
syncProcessor.start()
// id可以是sessionId,表示一个session中的命令固定交给一个线程执行 // 单机模式下,workers中只有一个线程池,这个线程池有多个线程,所以传进来的id固定为0 // 集群模式下,workers中有多个线程池,每个线程池中只有一个线程,用哪个线程由sessionId来决定 // 所以单机模式下,服务端在处理命令时,这些命令会并发的交给线程来处理 int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest);//线程池
//业务层接受队列,业务对象Notification recvqueue = new LinkedBlockingQueue<Notification>();
followLeader()
// 如果是写请求,则需要进行两阶段提交numWriteQueuedRequests.incrementAndGet();
NIO#start()
DatadirCleanupManager 开启一个定时器,根据配置清空多余的日志和快照文件
logicalclock.set(n.electionEpoch); 则修改自己的选举周期
committedRequests.remove();commitsToProcess--;commitsProcessed++;// Process the write inline.// 处理写请求,交给下一个nextProcessorprocessWrite(request);commitIsWaiting = !committedRequests.isEmpty();
申明一个变量ackstate将respone的rstate赋值表示投票节点的状态将respone中的数据封装到Notification n
省略部分代码
将启动时读取到的dataTree中的数据序列化后写入Snapshot中
if (h.getType() == OpCode.auth) { .........} else if (h.getType() == OpCode.sasl) { } else { if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { ................. } else { ................. // 主逻辑 使用requestThrottler来处理请求,requestThrottler表示限流 submitRequest(si); }
判断的ackstate 和n.electionEpoch
request已经被提交(已经向follower节点发送了commit命令)committedRequests.add(request);
size()
如果发现集群统一的epoch比我自己要小,那就有问题,抛出错误 if (newEpoch < self.getAcceptedEpoch()) { LOG.error(\"Proposed leader epoch \" + ZxidUtils.zxidToString(newEpochZxid) + \" is less than our accepted epoch \" + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException(\"Error: Epoch of leader is lower\"); }
setFollower(makeFollower(logFactory));
queuedPackets.add(p)
wr.start()-run(),这里是领导者选举的主要逻辑
设置RequestProcessor chain责任链setupRequestProcessors()
next.processRequest(request);
do{
启动SyncRequestProcessor线程((SyncRequestProcessor) syncProcessor).start();
// 当前leader处于发现阶段 self.setZabState(QuorumPeer.ZabState.DISCOVERY); self.tick.set(0);
用与后面绑定地址和端口localAddress = addr;
创建QuorumPeer extends ZooKeeperThread一个线程
submittedRequests.add(request);
queuedWriteRequests.poll();
queuedRequests.add(request)
判断节点类型(临时节点、顺序节点、ttl节点)
lead()
启动AcceptThread, 负责接收连接事件if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); }
for (LearnerHandler f : getLearners()) { f.ping(); }
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO)向Leader节点发送FOLLOWERINFO数据并且从得到当前集群统一的epoch
if (key.isReadable() || key.isWritable()) { // 处理客户端的命令、请求 handleIO(key);}
过半
唤醒
启动线程run()查询就绪事件select()
startLeaderElection();领导者选取
prepRequestProcessor.start();
runFromConfig
if (n.electionEpoch == logicalclock.get())判断收到选票与当前节点的选举时钟
nextProcessor.processRequest(request);
相等
doWork()
handleIO(SelectionKey key)
存在
case Leader.REQUEST 转发给leader的请求
n为空表示没有选票
用线程去执行CommitWorkRequest#doWork(),这个方法会去调用
case OpCode.deleteContainer:........ case OpCode.delete..........
setTxnFactory 初始化日志文件对象setMyid 配置文件的服务idsetZKDatabase 初始化database对象
QuorumPeer#loadDataBase()
looking
if (needCommit(request)){
createElectionAlgorithm
判断shuttingDownLE == true
makeObserver(logFactory)
启动参数在这里使用
// committedRequests不为空,表示已经有请求被提交了(已经向follower节点发送了commit命令,leader自己还没有提交这个请求) // 等待被提交的请求,来自各个客户端发送的写请求 commitIsWaiting = !committedRequests.isEmpty(); // queuedRequests表示待处理的请求(包括读请求和写请求) // 包含了当前服务接收到的所有请求 // committedRequests队列中的请求正常情况下在queuedRequests队列中都存在 requestsToProcess = queuedRequests.size();
zks.submitRequestNow(request);
// 待同步的数据是先发在队列中的,这里就会开启一个线程去发送这些数据// 并且开启这个线程后,后续流程中想要发送给Learner的数据,只要添加到queuedPackets队列中即可// 这里是异步startSendingPackets();
没啥东西,不想看
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
LearnerCnxAcceptor#run()
领导者选举完了之后,开始进行Leader服务器的初始化leader对应的服务器实现类为LeaderZooKeeperServersetLeader(makeLeader(logFactory));
Request request = submittedRequests.take();...........
(NIOServerCnxnFactory$IOWorkRequest)workRequest.doWork()
如果没有过半,则调用当前方法的线程阻塞 while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); }
sendNotifications()发送投投自己的票
case Leader.ACK 两阶段提交中的ack
fh.run();
commit
sendPacket(qp)
判断当前节点的状态
super.startup()
super(\"SyncThread:\
处理其他命令请求增删查改readRequest();
queuePacket(qp);
proposalProcessor.initialize();
LEADER
enqueueRequest(si);
} else { numReadQueuedRequests.incrementAndGet();}
submittedRequests.add(request);
lOOKING
// 1. 会更新ZKDatabase// 2. 触发watch// 3. 如果是集群模式就把request添加到committedLog队列中ProcessTxnResult rc = zks.processTxn(request);后面的代码看着很多其实没啥东西,就是对应接收的请求做响应处理
// 节点阻塞接收Leader节点发送过来的数据 // create a reusable packet to reduce gc impact QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); // 处理 processPacket(qp); }
这里是主线代码,不是分支流程
epoch等于我自己的,或大于我自己的
ZooKeeperServerMain#main#initializeAndRun
setObserver(makeObserver(logFactory)); observer.observeLeader();
跳出选举循环
0 条评论
回复 删除
下一页