zookeeper分布式一致性协议ZAB与watcher监听机制源码流程
2024-03-05 20:22:45 0 举报
ZooKeeper是一个开源的分布式协调服务,用于维护配置信息、命名、分布式锁等。它基于ZAB(ZooKeeper Atomic Broadcast)一致性协议,该协议结合了原子广播和崩溃恢复机制,以确保数据的强一致性。ZAB包含两个阶段:领导者选举和崩溃恢复。领导者负责接收客户请求和保证写操作的顺序性。崩溃恢复阶段处理领导者崩溃的情况,通过选举新领导者来保证系统的可靠性。ZooKeeper还提供了watcher监听机制,允许客户端注册监听器,以便在节点数据发生变化时获取通知。
作者其他创作
大纲/内容
committedRequests.add(request)
watcher.process(pair.event)
Leader服务端
wait()
CommitProcessor.run()
case Leader.PROPOSAL
真正提交数据到内存
processRequest
create
调用统一逻辑
zks.processTxn(request)
Request request = submittedRequests.take()
committedRequests.poll()
Leader.COMMIT
queuedRequests.add(request)
p.watchRegistration.register(err)
nextProcessor.processRequest(request)
syncWithLeader(newEpochZxid)
leader
FollowerRequestProcessor
消费
WatcherEvent event = new WatcherEvent();eventThread.queueEvent( we )
processRequest(Request si)
p.watchRegistration != null
getData()方法的watch为true会走这段逻辑
new Zookeeper()
this.snapLog.commit()
createNode
从队列取出
将watcher加入到path对应watcher的集合里面
submittedRequests.add(request)
Leader.INFORM
循环发送所有follower
channel.writeAndFlush()
LeaderZooKeeperServer.setupRequestProcessors()初始化
FollowerZooKeeperServer.setupRequestProcessors()
sock.write(p.bb)
selector.select(waitTimeOut)
获取数据包
queuedPackets.poll()
阻塞等待IO事件发生
w.process(e)
waitingEvents.add(pair);
wakeup()
for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);}
Object event = waitingEvents.take()
sendObserverPacket(qp)
服务端接收数据
Zookeeper.getData()
outgoingQueue.getFirst();
containsQuorum(qvAckset.getAckset())
参考leader选举流程源码
FinalRequestProcessor
client
sockKey.isReadable()
processEvent(event)
queuedPackets.add(p)
watchTable.remove(path)
跟服务端建立NIO连接
p.createBB()
响应客户端
sendPacket(pp)
zk.commitProcessor.commit(p.request)
Zookeeper.create()
PrepRequestProcessor.run()
hzxid.incrementAndGet()
唤醒等待线程
finishPacket(packet)
sendThread.start()
ProposalRequestProcessor
case OpCode.getData注意关注watcher操作
hasAllQuorums()
将数据发送给服务端
watchers.add(watcher)
pRequest(request)
wakeup用户唤醒阻塞在select上的线程
PrepRequestProcessor
LearnerHandler.run()
放入到commit队列
case OpCode.create
放入回调通知队列
selector.wakeup()
初始化另外两个链条
receiveMessage(buf)
processCommitted()
syncProcessor.processRequest(request)
p.notifyAll()
存入队列
while(true)死循环获取事件
leader接收消息
调用监听回调
SyncRequestProcessor.run()
sendPackets()
submitRequest(si)
Follower节点
outgoingQueue.add(packet)
zk.startup()
是否半数以上
while (this.isRunning()) {readPacket(qp);processPacket(qp);}
AckRequestProcessor
触发客户端监听事件
唤醒
firstProcessor.processRequest(si)
sendThread.readResponse(incomingBuffer)
zks.getZKDatabase().commit()
flush(toFlush)
CommitProcessor
sockKey.isWritable()
写日志文件
EventThread.run()
zks.getLeader().propose(request)
序列化PROPOSAL消息发送出去给follower
cnxn.processMessage((ByteBuf) msg)
存放数据
节点变动会通知客户端,客户端收到会回调监听方法
数据放入队列
sendBuffer(bb)
sendPacket(qp)
eventThread.start()
Set<Watcher> watchers = watches.get(clientPath)
leader.lead()
p.addAck(sid)
这里的watcher是客户端的netty连接对象即NettyServerCnxn
ToBeAppliedRequestProcessor
LeaderRequestProcessor
toFlush.isEmpty()
startSendingPackets()
nextProcessor.processRequest(si)
CommitWorkRequest.doWork()
queuedRequests.poll()
si = queuedRequests.poll()
OpCode.create
往队列存放消息
ackSet.size() > half
收到ack过半
txnLog.commit()
finally
LeaderZooKeeperServer.setupRequestProcessors()
replyHdr.getXid() == -1
通过jute序列化后封装到ByteBuf里面
异步线程池调用
SendThread.run()
接收leader消息
给所有follower发送commit消息
这里即客户端初始化的watcher
生成事务id
构建leader请求处理链
发送数据给observer
case Leader.ACK
SyncRequestProcessor
SendAckRequestProcessor
packet.wait()
leader提交commit数据
startConnect(serverAddress)
notifyAll()
ClientCnxn.start()
sendToNextProcessor(request)
sendThread.getClientCnxnSocket().packetAdded()
从队列取出消息
CommitProcessor线程wait等待
ScheduledWorkRequest.run()
监听是一次性的
在leader选举的时候,选出leader会创建一个leader对象
有NIO事件发生
CnxnChannelHandler.channelRead()
建立连接后监听读写事件并处理
inform(p)
发送ACK消息给leader
zks.getNextZxid()
异步线程
取出数据
commit(zxid)
follower.followLeader()
this.watcher = watcher
0 条评论
下一页