ZK_05写数据ZAB协议源码剖析(最终版)
2023-05-17 14:27:20 0 举报
zookeeperZAB协议源码剖析
作者其他创作
大纲/内容
CnxnChannelHandler.channelRead
pRequest2TxnCreate
next
pRequest(request)
processCommited
zls.processTxm
SyncRequestProcessor。run()
写入日志文件
queuedRequests.add(request)
责任链设计模式分工合作,业务解耦
next
for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); }
queueRequests.add
startConnect(serverAddress)跟服务端建立NIO连接
发一条等处理结果的消息让客户端线程在结果处理完成后通知客户端结果
cnxn.processMessage((ByteBuf) msg)处理传入消息
zks.getNextZxid()
nextProcess.processreuqest
CommitProcessor
notifyAll()
next-finalRequestProcess
cnxn.submitRequest
选举完后的的leader 在初始化leader对象LeaderZooKeeperServer.构建请求处理链接
flush(toFlush)
hasAllQuoruns()
sendThread.getClientCnxnSocket().packetAdded()
sock.write(p.bb)将命令发到服务端
requset里有一个是否监听的watch属性传到服务期,服务端会根据该属性做相应监听处理
LeaderZooKeeperServer.setupRequestProcessors初始化firstProcessor
服务端通过socketChannel接受客户端请求命令
如果接受ack过半leader发送comiit给follower
create /pxg jdc
commit(zxid)
p.addACK(sid)
eventThread.start()
Object event = waitingEvents.take()从队列中取出事件
给所有follwer发送关proposal
zk.commitProcessor
receiveMessage(buf)
FinalRequestProcessor
submittedRequests.add(request)往队列中放消息
EventThread.run
watcher.process(pair.event)调用监听回调方法
queuedPackets.add(p)
有NIO读写事件发生
AckRequestProcessor
txnLog.commit()事务日志
将数据存储在本机日志文件中
sendPacket(qp)
workerPool.schedule
for(LearnerHandler f:forwardingFollowers){f.queuePacket(qp)
firstProcessor有很多实现类,定位方法:1.debug去找2.找初始化的地方
inform(p)
客户端
outgoingQueue.add(packet)放入发送队列中等待发送
服务端
ClientCnxn.start
sendToNestProcess
processEvent(event)处理事件
SyncRequestProcessor
wakeup用来唤醒阻塞在select方法上的线程,底层会往管道里写一个字节,这个字节是为了触发写事件,写事件触发后就会将待发送队列的命令数据发给服务端
scheduledwoek.dowork
selector.wakeup()
服务端follower
commitProcessor.start()
生成事务zxid处理客户端命令逻辑都是单线程从队列里 拿数据所以保证了事务处理的顺序一致性
nextProcessor.processRequest(i)
queuedRequests.add(request)
fzk.commit(qp.getZxid())
commitProcessor.run()
wakeup
queuePacket封装request到packet中
hzxid.incrementAndGet() 原子方式将当前值加一
selector.select(waitTimeOut)等待IO事件发生
containsQuorum(qvAckset.getAckset())
case Leader.ACK
leader.processAck
用户登录注册流程开始
sendTread.radResponse
zk.create
submittedRequests.take()从消息队列中区消息
queuedRequests.take()
queuedRequests.poll()
submitRequest(si)
queueRequests.poll
syncProcessor.processRequest(request)
ProposalRequestProcessor
sendThread.start()
sendPacket(pp)
new ZooKeeper
ToBeAppliedRequestProcessor
case OpCode.create
nextProcessor.processRequest(request)
ackSet.size()>half
snapLog.commit();快照日志
prepRequestProcessor.run()
处理ack(leader字自己也发ack)
LearnerHandler.run(()
pRequest2Txn
LeaderRequestProcessor
循环所有发送ollwerpacketf
commitProcessor.commit(request)
构建leader请求处理链
QuorumPackrt qp = new QuorumPackrt();while((thisi.isrunning()){readPacket(qp);processPacket(qp)
PrepRequestProcessor
prepRequestProcessor.start()
zks.getLeader().propose(request)
nextPending.set
firstProcessor.processRequest(si)
nextProcessor.processRequest(request)
wait
0 条评论
下一页