Zookeeper 原子广播协议(ZAB)源码
2022-02-23 16:19:45 0 举报
Zookeeper 原子广播协议保证消息一致和故障恢复源码核心流程
作者其他创作
大纲/内容
ZK服务端(Follower)
run()
与commitProcessor中不是同一个队列
packet
add
创建责任链处理消息
监听只触发一次,触发后移除watchers = watchTable.remove(path);paths.remove(path);
case Leader.PROPOSAL:
request
processEvent(event);
调用下一个processornextProcessor.processRequest(request);
syncProcessor.processRequest(request);
其他流程和leader类似直接从创建follower开始
event
同leadersyncProcessor.processRequest(request);
执行创建命令zooKeeper.create(path)
case Leader.ACK:
添加watcher到watcherTable
zk.startup();
阻塞线程,直到leader发起commit命令wait();
nextProcessor
创建请求数据request 、执行后的回复数据response、请求头hRequestHeader h = new RequestHeader();CreateRequest request = new CreateRequest();CreateResponse response = new CreateResponse();
Selector
QuorumPeerMain.main(String[] args)
异步连接时,控制线程运行开关firstConnect = new CountDownLatch(1);
qvAckset.getAckset().add(sid);
根据配置进行启动runFromConfig(config);
commitProcessor.commit(request);
committedRequests.add(request);
建立Netty连接connectFuture = bootstrap.connect(addr);
处理leader 返回的数据while (this.isRunning()) { readPacket(qp); processPacket(qp); }
FinalRequestProcessorfinalProcessor
eventThread.run()
向ZK服务端写数据
firstProcessor.processRequest(si);
SendAckRequestProcessorsendAckRequestProcessor
getData方法watch为true时走if里面的逻辑
与eventThread中是一个队列add()
nextProcessor.processRequest(request);
创建ByteBufp.createBB();
processRequest
rc = zks.processTxn(request);
packet = pendingQueue.remove();
main.initializeAndRun(args);
submittedRequests队列
eventThread.queueEvent( we );
唤醒执行zookeeper命令的线程p.notifyAll();
receiveMessage(buf);
for (Watcher w : watchers) { (NettyServerCnxn)w.process(e); }
创建发送包Packet packet = queuePacket
同leaderCommitProcessorcommitProcessor
set
FollowerRequestProcessorfirstProcessor
take()
请求头类型不是ping且不是auth命令
processCommitted();
commit(zxid);
request = committedRequests.poll();
ACK确认过半向下执行否则return false
case Leader.COMMIT:
LearnerHandler fh = new LearnerHandlerfh.start();
反序列化获取服务端返回数据
ACK过半确认机制return (ackSet.size() > half);
syncWithLeader(newEpochZxid);
创建proposalProcessor时内部初始化子责任链
Request request = queuedRequests.take();
等待连接服务端成功再继续执行执行firstConnect.await
read事件
zookeeper命令放入发送队列outgoingQueue.add(packet);
WatchedEvent we = new WatchedEvent(event);
packet.wait();
zk.commitProcessor.commit(p.request);
setupRequestProcessors();
pRequest(request);
sendThread.start();
解析传入的配置文件config.parse(args[0]);
add(packet)
绑定Netty端口,等待客服端连接startServerCnxnFactory();
nextPending.set(request);
开启集群选举线程startLeaderElection(); super.start();
nextProcessor.processRequest(si);
sendPktOnly(p);
waitingEvents
触发读事件执行pipeline中的Handler方法
request = queuedRequests.poll()
SyncRequestProcessorsyncProcessor
sendToNextProcessor(pending);
连接事件发生时注册读事件
发给集群中所有的follower
ZK客服端
执行创建操作设置requestpRequest2Txn
queuedRequests.add(request);
选出Leadercase LEADING:
queuedRequests队列
设置连接状态为已连接state = States.CONNECTING;
committedRequests队列
向每个observer发送数据for (LearnerHandler f : getObservingLearners()) { f.queuePacket(qp); }
outgoingQueue
创建发送命令线程和事件处理线程sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
WorkerGroup
sendPacket(pp);
sendThread.readResponse(incomingBuffer);
case OpCode.create:
CommitProcessorcommitProcessor
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); }
Proposal p = new Proposal();p.packet = pp;p.request = request;
ZK服务端(Leader)
processRequest()
follower.followLeader();
poll
submitRequest(si);
设置replyHdr到packet.replyHeader
创建watcher事件WatcherEvent event = new WatcherEvent();event.setPath(path)
设置服务端连接方式,这里使用Netty cnxnFactory = ServerCnxnFactory.createFactory();
LeaderRequestProcessorfirstProcessor
firstConnect.countDown();
next.processRequest(request);
向每个follower发送commit命令for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp);}
pendingQueue.add(p);
唤醒commitProcessor提交任务notifyAll();
刷新磁盘flush(toFlush);
调用责任链
sendThread.run()
开始连接到服务端startConnect(serverAddress);
未连接到服务端!clientCnxnSocket.isConnected()
if (replyHdr.getXid() == -1)服务端返回-1
doWork()
返回服务端的回复消息return r
leader.lead();
CnxnChannelHandler.channelRead(msg)
eventThread.start();
FollowerZooKeeperServer.setupRequestProcessors();
p.addAck(sid);
创建客服端对象cnxn = new ClientCnxn()
处理ACK包switch (qp.getType())
sendObserverPacket(qp);
poll()
if (p.watchRegistration != null) { p.watchRegistration.register(err); }
使用Netty进行服务端连接ClientCnxnSocketNettyclientCnxnSocket.connect(addr);
同leaderSyncRequestProcessorsyncProcessor
Request request = submittedRequests.take();
startZkServer()
watcher监听机制说明case OpCode.getData:
PrepRequestProcessorsyncProcessor
cnxn.start();
zks.getLeader().propose(request);
makeFollower(logFactory)
添加业务Handler到pipelineCnxnChannelHandler
ProposalRequestProcessorproposalProcessor
rsp = new CreateResponse(rc.path);
调用监听回调方法watcher.process(pair.event);
调用ZAB提交数据责任链
quorumPeer.start();
客服端服务端已建立连接
createNode
同leaderFinalRequestProcessorfinalProcessor
BossGroup
finishPacket(packet);
选择网络连接方式NIO/Netty默认NIO本流程图采用Netty方式getClientCnxnSocket()
remove
创建ZAB提交数据责任链
notifyAll()
cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
si = queuedRequests.take();
ToBeAppliedRequestProcessortoBeAppliedProcessor
submittedRequests.add(request);
触发监听
AckRequestProcessorackProcessor
pendingQueue
channel.write(writeBuffer);
waitingEvents.take();
add()
waitingEvents.add(pair);
inform(p);
p.hasAllQuorums()
p.addAck(sid);
sendPacket(qp);
0 条评论
下一页