zookeeper
2023-02-17 21:59:30 4 举报
AI智能生成
zookeeper启动流程图
作者其他创作
大纲/内容
loadDataBase();
zookeeper加载数据文件
zookeeper加载数据文件
zkDb.loadDataBase();
cnxnFactory.start();
启动NIO或者Netty
启动NIO或者Netty
bootstrap.bind(localAddress);
绑定地址端口
绑定地址端口
收到数据
CnxnChannelHandler进行处理
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
CnxnChannelHandler进行处理
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
LeaderZooKeeperServer.setupRequestProcessors
初始化责任链
初始化责任链
LeaderRequestProcessor
PrepRequestProcessor
构建数据,zxid
构建数据,zxid
ProposalRequestProcessor
将消息分发给follower
将消息分发给follower
CommitProcessor
SyncRequestProcessor
持久化消息
持久化消息
AckRequestProcessor
TobeAppliedRequestProcessor
FinalRequestProcessor
处理数据
cnxn.processMessage((ByteBuf) msg);
cnxn.processMessage((ByteBuf) msg);
receiveMessage(buf);
zks.processPacket(this, bb);
submitRequest(si);
在LeaderRequestProcessor初始化
构建责任链处理
firstProcessor.processRequest(si);
构建责任链处理
firstProcessor.processRequest(si);
放入队列
submittedRequests.add(request);
submittedRequests.add(request);
PrepRequestProcessor.start()
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
构建zxid
构建zxid
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
nextProcessor.processRequest(request);
ProposalRequestProcessor
sendPacket(pp);
sendPacket(pp);
遍历follower,发送数据 Leader.class
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
将消息放入队列
写数据文件,run方法
syncProcessor.processRequest(request);
syncProcessor.processRequest(request);
flush(toFlush);
写文件
zks.getZKDatabase().commit();
zks.getZKDatabase().commit();
AckRequestProcessor
nextProcessor.processRequest(i);
nextProcessor.processRequest(i);
给自己发送一个ACk
leader.processAck(self.getId(), request.zxid, null);
leader.processAck(self.getId(), request.zxid, null);
加入集合
p.addAck(sid);
p.addAck(sid);
判断是否可以提交
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()
inform(p);
响应是否超过半数
return (ackSet.size() > half);
return (ackSet.size() > half);
超过半数,发送给follower
commit(zxid);
commit(zxid);
构建commit类型数据包
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
inform(p);
发送数据包给Observer
自己commit
zk.commitProcessor.commit(p.request);
zk.commitProcessor.commit(p.request);
sendToNextProcessor(pending);
构建内存写数据
FinalRequestProcessor.processorRequest
rc = zks.processTxn(request);
FinalRequestProcessor.processorRequest
rc = zks.processTxn(request);
createNode
回复监听事件
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
监听是一次性的
watchers = watchTable.remove(path);
watchers = watchTable.remove(path);
case OpCode.getData
zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
getDataRequest.getWatch() ? cnxn : null);
把watch放进去
dataWatches.addWatch(path, watcher);
dataWatches.addWatch(path, watcher);
处理完数据,回复客户端
cnxn.sendResponse(hdr, rsp, "response");
cnxn.sendResponse(hdr, rsp, "response");
channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(onSendBufferDoneListener);
startLeaderElection();
初始化选举相关数据
初始化选举相关数据
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
生成当前节点
生成当前节点
createElectionAlgorithm(electionType);
qcm = createCnxnManager();
初始化选举管理
初始化选举管理
listener.start();
启动监听
启动监听
ss = new ServerSocket();
创建serverSocket
Socket client = ss.accept();
socket接收连接
创建serverSocket
Socket client = ss.accept();
socket接收连接
handleConnection(sock, din);
处理连接信息
处理连接信息
sid = din.readLong();
读取发送选票的机器id
读取发送选票的机器id
如果机器id小于自己,就断开连接,并发送连接
大于就启动
SendWorker
RecvWorker
SendWorker
RecvWorker
snedWorker像发送连接的sid发送选票
RecvWorker
将接收到选票放入recevQueue
将接收到选票放入recevQueue
le = new FastLeaderElection(this, qcm);
启动快速选举线程
启动快速选举线程
启动WorkerSender
发送选票
发送选票
process(m);
处理选票
处理选票
如果是自己的sid,就放入自己recvQueue
addToSendQueue(bq, b);
将选票放入发送线程
将选票放入发送线程
connectOne(sid);
进行连接
进行连接
启动WorkerReceiver
接收选票
接收选票
self.getPeerState() == QuorumPeer.ServerState.LOOKING
如果当前自己还是Looking状态
如果当前自己还是Looking状态
是
已经选举出了leader
super.start();
case LOOKING:
logicalclock.incrementAndGet();
逻辑时钟+1
逻辑时钟+1
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
初始化选票自己
初始化选票自己
sendNotifications();
将选票发送出去
将选票发送出去
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
从别人那里获取读取
TimeUnit.MILLISECONDS);
从别人那里获取读取
如果为null创建连接
不为null
判断接收到的状态
也是looking的时候就进行选票PK
将胜出的选票再次发送
termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))
判断是否超过半数
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))
判断是否超过半数
更新自己状态
case FOLLOWING:
follower.followLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
建立Socket连接
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
syncWithLeader(newEpochZxid);
zk.startup();
创建责任链
setupRequestProcessors();
setupRequestProcessors();
SyncRequestProcessor
SendAckRequestProcessor
FollowerRequestProcessor
死循环接收数据
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
case Leader.PROPOSAL:
fzk.logRequest(hdr, txn);
fzk.logRequest(hdr, txn);
写入本机
syncProcessor.processRequest(request);
syncProcessor.processRequest(request);
写回去
SendAckRequestProcessor.processRequest
SendAckRequestProcessor.processRequest
case LEADING:
makeLeader(logFactory)
创建leader监听
创建leader监听
leader.lead();
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
cnxAcceptor.start();
接收事件
s = ss.accept();
s = ss.accept();
LearnerHandler.start();
startSendingPackets();
将消息发送出去
oa.writeRecord(p, "packet");
oa.writeRecord(p, "packet");
接收消息,并且进行反序列化
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
case Leader.ACK:
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
startZkServer();
for (LearnerHandler f : getLearners()) {
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}
一直循环pingFollwer
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}
一直循环pingFollwer
0 条评论
下一页