zookeeperZAB协议源码
2022-09-20 14:43:07 0 举报
zookeeperZAB协议源码
作者其他创作
大纲/内容
receiveMessage(buf);
处理客户端命令
发送给所有的follwer
服务端Netty配置
case OpCode.getData:
这里的watcher是NettyServerCnxn
if (!isWaitingForCommit()) { wakeup(); }
监听器赋值
SendThread#run()
processRequest
2
监听是一次性的触发一次会删除
获取数据节点
7
while(true)
处理数据
ToBeAppliedRequestProcessor
nextProcessor.processRequest(request);
sendPacket(pp);
next.processRequest(request);
Object event = waitingEvents.take();
AckRequestProcessor
processCommitted();
submittedRequests.take();
EventThread#run()
3
zks.getLeader().propose(request);
写数据
返回数据给客户端
CommitProcessor#run()
queuePacket
客户端启动
submitRequest(si);
wait();
syncProcessor.processRequest(request);
watchers = watchTable.remove(path);
从outgoingQueue中不断取数据发送
channel.writeAndFlush
将数据保存到本机日志文件
queuedRequests.add(request);
case OpCode.create:
9
返回结果给客户端
CommitProcessor
ZookeeperServer#processPacket
处理proposal(三条主要逻辑)
zookeeper处理数据的责任链模式
有一个线程专门取数据
LeaderZooKeeperServer#setupRequestProcessors
创建两个线程
new ClientCnxn()
NettyServerCnxnFactory
LeaderRequestProcessor
ProposalRequestProcessor
判断是否ACK过半
p = outgoingQueue.remove();
rsp = new CreateResponse(rc.path);
5
FinalRequestProcessor
zooKeeper.create(\"/zk\
notifyAll();
channelHandler
(NettyServerCnxn)w.process(e);
服务端处理数据的处理器
cnxn.processMessage((ByteBuf) msg);
收到数据
ClientCnxn#queuePacket
同步数据给observer
processEvent(event);
si = queuedRequests.poll();
channelRead
ZAB重要流程
通过NIO发送给客户端客户端触发回调
submittedRequests有数据后交给线程处理
如果过半
加入watcher集合
clientConfig = new ZKClientConfig();
submittedRequests.add(request);
nextPending.set(request);
queuedRequests.poll()
唤醒哪个?
看看创建的逻辑
服务端涉及修改,zxid会增加
不断从waitingEvents中取数据
firstProcessor.processRequest(si);
queuedPackets.add(p);
1、客户端配置实例
将客户端发过来的数据构建成Proposal
发送commit给其他follower节点
等待线程被唤醒后返回客户端数据以及写内存数据
创建ClientCnxn实例
将数据加入队列中
proposalProcessor.initialize();
4
2、启动两个线程SendThreadEventThread
SyncRequestProcessor
8
p.addAck(sid);
自己给自己发ACK
处理事件
线程来取
sendBuffer(bb);
节点变动触发客户端监听
将事件发给客户端
pipeline.addLast(\"servercnxnfactory\
处理其他节点发来的消息,会专门有一个线程来处理。参考zk选举源码。
watchManager.defaultWatcher = watcher;
watcher机制
传输数据
写入数据到IO
createNode
1
PrepRequestProcessor
加入队列
给所有的follwer发送Proposal
发送命令数据
pRequest(request);
outgoingQueue.add(packet);
6
写内存数据 rc = zks.processTxn(request);
cnxn.start();
ZooKeeper zooKeeper = new ZooKeeper(\"127.0.0.1\
SyncRequestProcessor.start();--run()
写本地日志文件
收到ByteBuf
处理客户端发过来的包
zks.getZKDatabase().append(si)
取数据
PrepRequestProcessor#start()->run
sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread();
进行客户端处理数据流程
watcher.process(pair.event);
0 条评论
下一页