ZAB源码梳理
2022-10-27 11:12:17 5 举报
zab流程梳理
作者其他创作
大纲/内容
CommitProcessor
sendBuffer(bb);
调用监听回调方法
watcher.process(pair.event);
replyHdr.getXid() == -1收到服务端数据变动返回事件
SyncRequestProcessor
中间省略一些步骤
setupRequestProcessors();
pRequest2Txn
功能类似
commitProcessor.start();
从队列取消息
notifyAll();
Follower请求处理链
唤醒后处理流程与leader类似
follower.followLeader();
处理数据
super.startup();
leader.lead();
wakeup();
给follower发数据
processEvent(event);
!toFlush.isEmpty()
WatcherEvent event =newWatcherEvent();eventThread.queueEvent( we );
唤醒之前阻塞的的commitProcessor线程
request = submittedRequests.take();
processPacket(qp);
syncWithLeader(newEpochZxid);
containsQuorum(qvAckset.getAckset())
PrepRequestProcessor
ScheduledWorkRequest.run()
watchers.add(watcher);
channel.writeAndFlush
Leader初始化时设置执行器链
NettyServerCnxn
将数据通过jute序列化后装入bytebuffer
sendThread.getClientCnxnSocket().packetAdded();
prepRequestProcessor.run();
(ackSet.size() > half);
jute序列化
eventThread.start();
LearnerHandler.run()
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); }
while (this.isRunning()) { readPacket(qp); processPacket(qp); }
数据写本地文件
服务端follower
getClientCnxnSocket()
过半,leader发送commit给所有的follower
AckRequestProcessor
p.createBB();
sendPacket(pp);
p.hasAllQuorums()
this.watcher = watcher;
ProposalRequestProcessor
create命令不会走该逻辑,getdata的watch为true时会走该逻辑,因为getdata方法中会初始化watchRegistration
往队列里放消息
commitProcessor.commit(request);
firstProcessor.processRequest(si);
paths.remove(path);
生成事务id,处理客户端命令逻辑都是单线程从队列取,保证事务处理的顺序一致性
queuedRequests.add(request);
startConnect(serverAddress);
sendPacket(qp);
服务端
event = waitingEvents.take();
queuedRequests.add(request);
while (true)
sendToNextProcessor(request);
startSendingPackets();
zk.commitProcessor.commit(p.request);
si = queuedRequests.poll();
zooKeeper.create
LeaderRequestProcessor
Request si = new Request
zks.getNextZxid()
FinalRequestProcessor
queuedRequests.run();
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); }
syncProcessor.processRequest(request);
等待I/O
zk.startup();
会唤醒阻塞在seletor上的线程,底层通过发送一个字节触发写事件,写事件触发后会将待发队列中的命令发给服务端
CommitProcessor
createNode
构建ack的packet返回给leader
this.watchManager.defaultWatcher = watcher;
case OpCode.create:
fzk.commit(qp.getZxid());
wait();
finally
pRequest(request);
sock.write(p.bb);
request = queuedRequests.poll()
ClientCnxnSocketNIO
将watcher加入到path对应的watcher集合中
channel.writeAndFlush
调用下一个请求处理器
服务端leader
FollowerRequestProcessor
sendBuffer(bb);
获取节点数据
同步消息给observer
客户端
case OpCode.getData:
prepRequestProcessor.start();
sendPackets();
clientCnxnSocket.connect(addr);
只在主节点原子自增
jute序列化后发送
sockKey.isReadable()
processQueuedBuffer();
if (p.watchRegistration != null) { p.watchRegistration.register(err); }
nextPending.set(request);
case Leader.COMMIT:
sendThread.start();
OpCode.create:
建联后监听事件并处理
节点变动触发客户端监听
flush(toFlush);
zks.getLeader().propose(request);
FollowerZooKeeperServer:setupRequestProcessors();
outgoingQueue.add(packet);
nextProcessor.processRequest(upgradeRequest);
将请求数据提交给服务端
snapLog.commit();
commit(zxid);
p.notifyAll();
wait();
创建zk了客户端
queuedPackets.add(p);
waitingEvents.add(pair);
watchTable.remove(path);
nextProcessor.processRequest(request);
p.addAck(sid);
packet.wait();
w.process(e);
处理I/O
commitProcessor.start();
syncProcessor.processRequest(request);
inform(p);
workRequest.doWork();
txnLog.commit();
submitRequest(si);
FinalRequestProcessor
case Leader.ACK:
p = queuedPackets.poll();
nextProcessor.processRequest(i);
worker.execute(scheduledWorkRequest);
zks.processTxn(request);
SyncRequestProcessor
leader写过数据给自己ack
Watcher(NettyServerCnxn)
接收到的ack是否过半
ToBeAppliedRequestProcessor
创建节点数据
case Leader.PROPOSAL:
ZookeeperServer.startup();
commitProcessor.run();
selector.wakeup();
queuePacket
发送给服务端
run
cnxn.start();
selector.select(waitTimeOut);
写事件:sockKey.isWritable()
加载数据到内存
sendThread.readResponse(incomingBuffer);
将请求封装到Packet中并罚入发送队列outgoingqueue等待发送
设置为默认监听
processCommitted();
监听是一次性的,触发后会在服务端删除
finishPacket(packet);
把NettyServerCnxn当作Watcher传了进去
startZkServer();
客户端写数据触发服务端handler读事件
submittedRequests.add(request);
new ZooKeeper
next.processRequest(request);
processRequest
zks.getZKDatabase().commit();
cnxn.processMessage((ByteBuf) msg);
节点变动通知客户端,客户端收到通知后触发监听回调
SendAckRequestProcessor
构建Proposal
hzxid.incrementAndGet()
rsp = new CreateResponse(rc.path);
发送给follower
Set<Watcher> watchers = watches.get(clientPath);
与服务端建联
触发写事件
收藏
0 条评论
下一页