ZK-3.5.8 服务端处理责任链
2023-06-14 11:40:53 3 举报
ZK-3.5.8 服务端处理责任链
作者其他创作
大纲/内容
leader
1. 如果是节点之间的同步请求,不走调用处理链2. 直接转发给下一个处理链:CommitProcessor3. 判断请求头不为空,是事务请求,propose生成提议(后面要广播提议,收集选票)4. syncProcessor处理器 记录事务日志(这个处理器在构造方法的时候会new)
return hzxid.incrementAndGet();
FinalRequestProcessor
run()
worker.execute(scheduledWorkRequest);
workRequest.doWork();
ZK为什么要将Netty piple中的 handler 设计成责任链模式?分工合作,业务解耦
底层就是通过Netty,给客户端发送WatchedEvent事件
follower
request.setHdr(null);request.setTxn(null);
处理ZAB协议在二阶段提交的请求
ScheduledWorkRequest.run()
CommitProcessor
. . . . . .省略
zks.processTxn(request);
request = committedRequests.poll();
送给下一个Processor
tryToCommit
queuedRequests.add(request);// 异步提升性能
sendBuffer(bb);
高 32 位 :leader选举周期当一轮新的选举结束后,选举周期加1,事务ID重新从0开始递增
Leader ID
case OpCode.createContainer:case OpCode.create:case OpCode.create2:
AckRequestProcessor.class
维护内存数据结构
NettyServerCnxn.class
nextProcessor.processRequest(i)
w.process(e);
ToBeAppliedRequestProcessor
case OpCode.getData:
leader写本地事务日志文件
往Leader本地 ackset 集合中放回头Follower 到时候也会往这里放
p.addAck(sid);
prepRequestProcessor.start();
rsp = new CreateResponse
private final AtomicLong hzxid = new AtomicLong(0);// 原子操作,不存在线程安全问题
PrepRequestProcessor
submittedRequests.take();
next
sendPacket(pp);
事务ID
flush(LinkedList<Request> toFlush)
响应客户端
创建对应类型的事务
这里请求头、事务都为空
switch (request.type)
processCommitted();
queuedRequests.add(request);
CommitWorkRequest.class
4、SyncRequestProcessor.class
commitProcessor.start();
检查是不是本地Session,创建临时节点会升级会话
processRequest(Request request)
服务端添加监听
封装成WatchedEvent
queuedPackets.add(p);
高32位
创建请求头
Leader会给自己发一个ACK
ProposalRequestProcessor
这里思考一个问题?zxid 是内存加那如果这个leader挂了,重新选举出新的leader怎么办?
sendToNextProcessor
nextProcessor.processRequest(request);
void run()
经过一系列校验:参数、权限、路径
queuedRequests.poll()
所以即使有一个follower 挂了,加上leader自己的ack,2个大于过半机制,也是可以写成功的
case 事务请求
getZKDatabase().font color=\"#000000\
添加到阻塞队列
等待线程被唤醒
LeaderRequestProcessor
封装成集群通信包QuorumPacket
pRequest(request);
低32位
回调这个路径下所有监听的客户端
按照请求顺序,单线程从队列取
font color=\"#323232\
firstProcessor
3、propose(request);
wait();
channel.writeAndFlush
这里判断是否满足过半机制满足则提交,不满足返回false
给每个follower发送提议,发送到 LearnerHandler 的阻塞队列在集群启动时,每个Follower在与Leader建立连接后,Leader会开启一个对应的LearnerHandler 线程,与Follower交互
next.processRequest(request);
dataWatches.triggerWatchchildWatches.triggerWatch
submittedRequests.add(request);// 异步提升性能
处理事务请求
封装客户端响应的结果
run方法
提交到线程池执行
0 条评论
下一页