zookeeper源码解析
2021-09-12 16:00:01 2 举报
AI智能生成
zookeeper进阶篇,源码,原理
作者其他创作
大纲/内容
作用
元数据存储
Dubbo:ZooKeeper作为注册中心
Kafka:分布式集群的集中式元数据存储,分布式协调和通知
Master选举
HDFS:Master选举实现HA架构
Canal、condis:分布式集群的集中式元数据存储,Master选举实现HA架构
分布式协调
Dubbo、Spring Cloud把系统拆分成很多的服务或者是子系统
ZooKeeper分布式锁
特性
顺序写
集群中只有一台机器可以写,所有机器都可以读
所有写请求都会分配一个zk集群全局的唯一递增编号,zxid,保证各种客户端发起的写请求都是有顺序的
数据一致性
任何一台zk机器收到了写请求之后都会同步给其他机器,保证数据的强一致
你连接到任何一台zk机器看到的数据都是一致的
数据同步是用什么协议做的
ZAB协议,原子广播协议
ZAB的核心思想
主从同步机制
2PC (两阶段提交)+ 过半写机制
1,发起一个事务proposal之前,leader会分配一个全局唯一递增的事务id,zxid,通过这个可以严格保证顺序
2,leader会为每个follower创建一个队列,里面放入要发送给follower的事务proposal,这是保证了一个同步的顺序性
3,每个follower收到一个事务proposal之后,就需要立即写入本地磁盘日志中,写入成功之后就可以保证数据不会丢失了
4,然后返回一个ack给leader,然后过半follower都返回了ack,leader推送commit消息给全部follower
5,leader自己也会进行commit操作
Leader收到事务请求,转换为事务Proposal(提议)同步给所有的Follower,超过半数的Follower都说收到事务proposal了,Leader再给所有的Follower发一个Commit消息,让所有Follower提交一个事务
崩溃恢复机制
如果Leader崩溃了,要重新选举Leader保证继续运行
选举一个leader出来,然后leader等待集群中过半的follower跟他进行数据同步,只要过半follower完成数据同步,接着就退出恢复模式,可以对外提供服务了
如果一个follower跟leader完全同步了,就会加入leader的同步follower列表中去,然后过半follower都同步完毕了,就可以对外继续提供服务了
过半机器选举机制
剩余机器超过一半,集群宕机不超过一半的机器,就可以选举新的leader,数据同步
只要有超过一半的机器,认可你是leader,你就可以被选举为leader
最终一致性
不是强一致性,zk官方给自己的定义:顺序一致性,他比最终一致性更好一点
1,有的follower已经commit了,但是有的follower还没有commit
2,不是说leader必须保证一条数据被全部follower都commit了才会让你读取到数据,而是过程中可能你会在不同的follower上读取到不一致的数据,但是最终一定会全部commit后一致,让你读到一致的数据的
数据一致性问题
01,Leader收到了过半的follower的ack,接着leader自己commit了,还没来得及发送commit给所有follower自己就挂了
就会选举一个拥有事务id最大的机器作为leader,他得检查事务日志,如果发现自己磁盘日志里有一个proposal,但是还没提交,说明肯定是之前的leader没来得及发送commit就挂了
02,leader可能会自己收到了一个请求,结果没来得及发送proposal给所有follower之前就宕机了
老leader自己磁盘日志里有一个事务proposal,他启动之后跟新leader进行同步,发现这个事务proposal其实是不应该存在的,就直接丢弃掉就可以了
zxid
64位的,高32位是leader的epoch,是leader的版本;低32位才是自增长的zxid
新leader选举出来,epoch会自增长一位
老leader恢复了连接到集群是follower了,此时发现自己比新leader多出来一条proposal,但是自己的epoch比新leader的epoch低了,所以就会丢弃掉这条数据
高性能
每台zk机器都在内存维护数据,所以zk集群绝对是高并发高性能的
如果你让zk部署在高配置物理机上,一个3台机器的zk集群抗下每秒几万请求没有问题
高可用
集群中挂掉不超过一半的机器,都能保证可用,数据不会丢失
高并发
基于纯内存数据结构来处理,并发能力是很高的、
只有一台机器进行写,但是高配置的物理机,比如16核32G,写入几万QPS
所有机器都可以读,3台机器的话,起码可以支撑十几万QPS
三种角色的机器
Leader
只有Leader是可以写的
客户端可以随便连接leader或者follower,如果客户端连接到follower,follower会把写请求转发给leader
Follower
Follower是只能同步数据和提供数据的读取
Leader挂了,Follower可以继续选举出来Leader
Observer
Observer也只能读但是Observer不参与选举
zk是适合写少的场景
大量的服务的上线、注册、心跳的压力,达到了每秒几万,甚至上十万,zk的单个leader写入是扛不住那么大的压力的
提供读服务,可以无限的扩展机器
读可以有每秒几万QPS
配置:peerType=observer
所有机器的配置文件,都要加入一个server.4=zk04:2888:3888:observer
核心机制
客户端与ZooKeeper之间的长连接
客户端就会跟zk建立连接,是TCP长连接
建立了一个会话,就是session,可以通过心跳感知到会话是否存在
sessionTimeout,意思就是如果连接断开了,只要客户端在指定时间内重新连接zk一台机器,就能继续保持session,否则session就超时了
数据模型
树形结构的znode,里面可以写入值,就这数据模型,都在zk内存里存放
持久节点
哪怕客户端断开连接,也一直存在
临时节点
客户端断开连接,节点就没了
顺序节点
创建节点的时候自增加全局递增的序号
案例应用
临时顺序节
分布式锁
加锁的时候,是创建一个临时顺序节点
zk会自动给你的临时节点加上一个后缀,全局递增的,编号
如果你客户端断开连接了,就自动销毁这个你加的锁,此时人家会感知到,就会尝试去加锁
持久节点
元数据存储
临时节点
分布式协调和通知(微服务)
Watch监听
客户端可以对znode进行Watcher监听
znode改变的时候回调通知你的这个客户端
在分布式系统的协调中应用
分布式系统的协调需求
分布式架构中的系统A监听一个数据的变化,如果分布式架构中的系统B更新了那个数据/节点,zk反过来通知系统A这个数据的变化
环境配置
8核16G,16核32G,高配置虚拟机最好了,SSD固态硬盘
3台机器,1个leader,2个follower,
leader主要是写,每秒抗几万并发写入是可以的;leader+follower,读,每秒抗个5万~10万的读是没有问题的
leader主要是写,每秒抗几万并发写入是可以的;leader+follower,读,每秒抗个5万~10万的读是没有问题的
机器如果有16G的内存,堆内存可以分配个10G,栈内存可以分配每个线程的栈是1MB,Metaspace区域可以分配个512MB都可以
设置垃圾回收器
新生代+老年代,ParNew+CMS,
如果是大内存机器,不建议这个组合了,就用G1回收所有的垃圾对象,还得设置一些G1的参数,region的大小,预期的每次GC的停顿时间是多少毫秒,比如100ms
如果是大内存机器,不建议这个组合了,就用G1回收所有的垃圾对象,还得设置一些G1的参数,region的大小,预期的每次GC的停顿时间是多少毫秒,比如100ms
参数
tickTime:zk里的最小时间单位,2000毫秒,2s
其他的一些参数就会以这个tickTime为基准来进行设置,比如有的参数就是tickTime * 2
dataDir:主要是放zk里的数据快照,剖析zk的源码的时候
内存里有一份快照,在磁盘里其实也会有一份数据的快照,zk停机了,重启,才能恢复之前的数据
dataLogDir:写数据,2PC,proposal(事务),每台机器都会写入一个本地磁盘的事务日志,主要是放一些日志数据
SSD固态硬盘,读写速度非常快,dataLogDir,事务日志磁盘写,是对zk的写性能和写并发的影响是很大的
initLimit
zk集群启动的时候,默认值10,10 * tickTime,20s
leader在启动之后会等待follower跟自己建立连接以及同步数据,最长等待时间是20s
zk里存储的数据量比较大了,follower同步数据需要的时间比较长,此时可以调大这个参数
syncLimit
默认值5,5 * tickTime,10s
leader跟follower之间会进行心跳,如果超过10s没有心跳,leader就把这个follower给踢出去了,认为他已经死掉了
数据快照
一份是在磁盘上的事务日志,一份是在内存里的数据结构,
理论上两份数据是一致的,即使是有follower宕机,也是内存里的数据丢失了,但是磁盘上的事务日志都是存在的
理论上两份数据是一致的,即使是有follower宕机,也是内存里的数据丢失了,但是磁盘上的事务日志都是存在的
有的follower没收到事务日志就宕机了,也可以在启动之后找leader去同步数据
每次执行一定的事务之后,就会把内存里的数据快照存储到dataDir这个目录中去,作为zk当前的一个数据快照
1000个事务对应的内存数据写入到dataDir里作为一个数据快照,继续此时事务日志里有1032个事务,
此时zk重启,他可以直接把包含1000个事务的快照直接加载到内存里来
此时zk重启,他可以直接把包含1000个事务的快照直接加载到内存里来
然后把1000之后的32个事务,1001~1032的事务,在内存里回放一遍,就可以在内存里恢复出来重启之前的数据了
snapCount:100000,默认是10万个事务,存储一次快照
10万个事务以内,不需要快照,因为直接读取事务日志,回放到内存就重建内存数据了
maxClientCnxns
一台机器上最多能启动多少个ZooKeeper客户端
有限制的,默认来说60
zk servers最多只能允许你的一台机器跟他建立60个连接
每次请求都创建一个zk客户端,跟他建立连接,进行通信,再销毁zk客户端,如果并发有很多个请求一起连接zk,此时会导致一台机器上有很多zk客户端,会被zk servers拒绝的
jute.maxbuffer
一个znode最多可以存储多少数据呢?1mb,1048575
server.1=zk01:2888:3888
3888端口,是用来在集群恢复模式的时候进行leader选举投票的
2888的端口,是用来进行leader和follower之间进行数据同步和运行时通信
事务日志和数据快照是如何进行定时清理的
autopurge.purgeInterval=1
autopurge.snapRetainCount=3
autopurge.snapRetainCount=3
后台自动清理掉多余的事务日志文件和数据快照文件
磁盘的事务日志有没有丢失的风险
第一个阶段里,各个机器把事务日志写入磁盘,此时一般进入os cache的,没有直接进入物理磁盘上去
commit提交的时候一般默认会强制把写的事务fsync到磁盘上去
forceSync:yes
commit的时候,需要fsync到磁盘上去
但是,有可能会丢失部分os cache里没刷入磁盘的数据,如果是leader宕机
leaderServers:yes
leader是否接受客户端的连接,写请求由follower转发给leader,leader主要接受follower的转发写请求进行处理
cnxTimeout:5000
在进行leader选举的时候,各个机器会基于3888那个端口建立TCP连接,在这个过程中建立TCP连接的超时时间
命令
echo conf | nc localhost 2181
conf(查看配置)、cons(查看连接)、crst(重置客户端统计)、dump(输出会话)、envi(查看环境)、ruok(检查是否在运行)、stat(查看运行时状态)、srst(重置服务器统计)、wchs(查看watcher信息)、wchc(输出watche详细信息)、wchp(输出watcher,以znode为单位分组)、mntr(输出比stat更详细的)
开启ZooKeeper的JMX端口观察内存
-Dcom.sun.management.jmxremote.port=21811
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
curator客户端框架
CuratorFramework client = CuratorFrameworkFactory.newClient( "localhost:2181", 5000, 3000, retryPolicy);
client.start();
client.start();
配置中心
curator的crud的操作,底层都是调用的原生zk的API
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/my/path", "100".getBytes());
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/my/path", "100".getBytes());
client.setData().forPath("/my/path", "110".getBytes());
client.delete().forPath("/my/path");
byte[] dataBytes = client.getData().forPath("/my/path");
监听和通知
注意:用原生的zk去注册监听器的话,监听子节点或者节点自己,如果发生了对应的事件,会通知你一次,但是下一次再有事件就不会通知了,zk原生的API里,需要你每次收到事件通知之后,都需要自己重新注册watcher,但是curator就不会有这个问题
Path
监听节点下一级子节点的增、删、改操作
监听节点下一级子节点的增、删、改操作
PathChildrenCache pathChildrenCache = new PathChildrenCache(
client, “/yh/config”, true);
pathChildrenCache.start();
client, “/yh/config”, true);
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
}
});
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
}
});
cache就是把zk里的数据缓存到了你的客户端里来
针对这个缓存的数据加监听器,去观察zk里的数据的变化
Node
监听节点对应增、删、改操作
监听节点对应增、删、改操作
final NodeCache nodeCache = new NodeCache(client, "/cluster");
nodeCache.start();
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
Stat stat = client.checkExists().forPath(“/yh/config”);
if(stat == null) { }
else { nodeCache.getCurrentData(); }
}
});
public void nodeChanged() throws Exception {
Stat stat = client.checkExists().forPath(“/yh/config”);
if(stat == null) { }
else { nodeCache.getCurrentData(); }
}
});
Tree
其所有的子节点操作进行监听,呈现树形目录的监听
其所有的子节点操作进行监听,呈现树形目录的监听
final TreeCache treeCache = TreeCache.newBuilder(client, “/yh/config”).setCacheData(true).setMaxDepth(2)
.build();
.build();
增加监听:
treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
//增加或修改
if(treeCacheEvent.getType().equals(Type.NODE_ADDED)||treeCacheEvent.getType().equals(Type.NODE_UPDATED)){
String value = new String(treeCacheEvent.getData().getData());
localCache.put(key, Optional.ofNullable(value));//加入本地缓存
}
//删除
if(treeCacheEvent.getType().equals(Type.NODE_REMOVED)){
localCache.invalidate(key);//删除本地缓存
}
});
// 没有开启模式作为入参的方法
treeCache.start();
}
treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
//增加或修改
if(treeCacheEvent.getType().equals(Type.NODE_ADDED)||treeCacheEvent.getType().equals(Type.NODE_UPDATED)){
String value = new String(treeCacheEvent.getData().getData());
localCache.put(key, Optional.ofNullable(value));//加入本地缓存
}
//删除
if(treeCacheEvent.getType().equals(Type.NODE_REMOVED)){
localCache.invalidate(key);//删除本地缓存
}
});
// 没有开启模式作为入参的方法
treeCache.start();
}
Leader选举
第一种Leader选举机制
指定的目录下创建一个子节点,创建一个临时顺序节点
获取到的子节点做一个排序,然后看看自己是不是第一个子节点
如果你发现自己不是leader,对自己上一个节点施加一个监听器
如果发现上一个节点不存在了,此时会重新再次尝试去创建一个znode,相当于是竞争成为leader
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
leaderLatch.start(); // 尝试竞争为leader
leaderLatch.await(); // 直到等待他成为leader再往后走,异步化,底层会调用norifyall唤醒这个行代码
leaderLatch.start(); // 尝试竞争为leader
leaderLatch.await(); // 直到等待他成为leader再往后走,异步化,底层会调用norifyall唤醒这个行代码
第二种Leader选举机制
分布式锁来竞争成为leader的,如果说你获取到了锁,就说明你是leader,获取锁也是跟第一种一样,创建临时顺序节点
LeaderSelector leaderSelector = new LeaderSelector(
client,
"/leader/election",
new LeaderSelectorListener() {
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("你已经成为了leader......");
// 在这里干leader所有的事情,此时方法不能退出
Thread.sleep(Integer.MAX_VALUE);
}
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("连接状态的变化......");
if(connectionState.equals(ConnectionState.LOST)) {
throw new CancelLeadershipException();
}
}
});
leaderSelector.start();
client,
"/leader/election",
new LeaderSelectorListener() {
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("你已经成为了leader......");
// 在这里干leader所有的事情,此时方法不能退出
Thread.sleep(Integer.MAX_VALUE);
}
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("连接状态的变化......");
if(connectionState.equals(ConnectionState.LOST)) {
throw new CancelLeadershipException();
}
}
});
leaderSelector.start();
源码
注册Watcher监听器
CuratorFramework
CuratorZooKeeperClient
zookeeperFactory
注册watcher监听器
建立连接
启动一个线程,网络连接监听
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
创建原生ZooKeeper的客户端
session过期时间
watcher,监听zk里的事件的变更,回传给curator的framework
底层的源码,就会去跟zk的一台机器建立真正的TCP长连接
之后就会进行心跳,维护一个session,发送各种请求过去给zk server
注册watcher,zk有事件变更,会通过TCP长连接,反向通知你的ZooKeeper客户端,他会回调你提供给他的watcher
Curator实现的ZK读写锁
/locks/lock/__WRITE__{很多临时顺序节点},排队等待加写锁
判断你加写锁是否成功,看一下你在/locks/lock下面的位置,如果你的写锁临时顺序节点是在/locks/lock下面的第一个
如果没有获取到锁对哪个节点进行监听
对前一个节点加监听器
/locks/lock/__READ__{很多临时顺序节点},加读锁
去找到排在最前面的写锁,如果发现排在位置=0有一个写锁,此时获取读锁就一定失败
读锁的位置排在第一个写锁的前面,就可以获取读锁
如果没有获取到读锁,监听第一个写锁
羊群效用
解决写锁羊群效用
写锁只关注自己前面的写锁
解决读锁羊群效用
所有的读锁只关注监听离自己最近的前一个写锁
集群扩容与宕机 自动感知机制
集群中加入一台机器,自动在zk中写入一个znode,临时节点,一旦节点关闭或者宕机,临时节点自动消失。由集群Master控制节点监听zk目录子节点变化,自动感知集群中节点的上线和下线
集群里的元数据存储,无非就是对znode进行crud
监听与回调,对你需要感知的znode加监听器,回调通知你
master选举,无非也都是说创建一些临时顺序节点,排在第一位的就是leade
zk的核心源码
环境
ant安装
http://ant.apache.org/bindownload.cgi
下载apache-ant-1.10.5-bin.tar.gz
配置环境变量
ANT_HOME D:\apache-ant-1.10.5
path D:\apache-ant-1.10.5\bin
classpath D:\apache-ant-1.10.5\lib
path D:\apache-ant-1.10.5\bin
classpath D:\apache-ant-1.10.5\lib
zk
https://zookeeper.apache.org/releases.html#download
下载https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/
zk集群
peer架构
每台机器都可以成为leader,可以干leader的事儿,每台机器也都可以成为follower
里面任何一个节点都可以认为是一个peer
zk的进程,就是QuorumPeerMain进程,java命令,通过java命令启动一个jvm进程
zk是Peer-to-Peer的集群架构,里面有leader-follower的角色区分,在多个peers节点之间要选举一个Leader出来的话
quorum集群,多台机器组成了一个集群,peer,集群各个机器的角色都是一样的,能干的事儿都是一样的,他不是master-slave的架构
zk启动
如果配置了servers,就以集群模式启动
QuorumPeerMain启动
单机版启动
启动ZooKeeperServerMain类
Leader选举流程
三台机器,myid分别是0,1,2
quorum=(3/2 + 1)=2
达到了quorum的数量,此时就可以发起leader选举了
第一轮投票
myid=0的机器,投票,(0, 0),第二个0代表zxid,发送给当前集群里其他的机器
收到的票是(1,0),也就是myid=1的机器的投票
myid=1的机器,投票,(1, 0),发送给当前集群里其他的机器
收到的票是(0,0),也就是myid=0的机器的投票
优先是zxid最大的机器成为leader
默认就是让myid最大的机器成为leader,推荐(1,0)成为leader
第二轮投票
myid=0的机器,投出去(1,0),收到的票是(1,0)
票数已经达到了集群的quorum大多数了
选举就结束,确定(1,0)机器是leader
myid=0的机器,follower
myid=1的机器,leader
myid=1的机器,leader
myid=2的机器,刚刚才启动,此时发现集群里已经选举出来leader了,此时自己让自己变成follower就可以了
一般来说如果你自己部署在windows上搞3台虚拟机,部署zk集群,一台一台接着启动,第二台机器往往是leader,第一台和第三台是follower,第二台是leader
myid=2的机器,(2,10)
zk里的序列化的协议是jute
类似于im系统里用的protobuf
序列化,你得把类转化为字节数组/字节流,通过网络传输类对象的字节流
反序列化,到了对方收到自己流之后,就把字节转换为一个类的对象
Follower在完成连接建立之后是如何向Leader进行注册的
leanrner里面的jute序列化(serialize方法)和字节流输出的一个过程
封装一个learnerInfo对象放入QuorumPacket进行序列化为字节流发送给leader
Leader是如何处理Follower的注册请求
从jute输入流读取数据
type,zxid
反序列化成对象,保存一些数据
如果follower刚跟leader连接后,会跟leader进行数据同步
数据同步完后,会接收follower的ack
后期leader会开一个线程不停的发送数据给follower
Session会话
建立连接主要说的是TCP物理连接,他其实还需要进行一些通信,建立一个Session会话
服务端会收到一个ConnectRequest请求
服务端拿到ConnectRequest请求后,jute反序列化成对象
session是由服务端构造开启的,客户端仅仅是发送ConnectRequest请求
createSession()
1,生成唯一的sessionId(64位)
二进制位运算
1,当前时间戳,左移24位,又右移8位
2,与myid的二进制左移56位进行或运算
3,sessionId++
最后一定是唯一的
2,在几个内存数据结构中放入这个session
3,对session计算它的过期时间以及特殊处理
expireTime就是session下一次的过期时间
session分桶机制
分桶(tickTime)过期时间管理
分桶(tickTime)过期时间管理
expirationInterval=2S间隔
zoo.cfg可以配置
分成一个个的桶,每个桶的长度就是2S,后面计算2S内有哪些session需要检测是否过期
(6/expirationInterval+1)*expirationInterval
(7/expirationInterval+1)*expirationInterval相等
达到效果:不同的expireTime得到相同的tickTime
提高管理的效率,每次处理多个session
分桶数据结构<expireTime,SessionSet>
expireTime(12:05):sessionSet(多个session)
expireTime(12:10):sessionSet(多个session)
session tracker而言就是不停的过期一个一个分桶
ping心跳
客户端是如何定期发送Ping心跳到服务端
客户端clientCnxn在run的时候会计算ping的时间
sessionTime一般会自己设置
假如120S
sendPing()客户端每隔一段时间发送ping到服务端
客户端把ping请求放到“等待发送队列”
同时唤醒底层的网络通信组件socket
底层selector监听writeable事件
服务端是如何接收和处理Ping心跳请求
底层selector监听readable事件
读出请求数据
1,任何的请求都会submitRequest()
2,都会touch一下session,更新他的expireTime,重新分桶
3,会交给RequestProcessor线程来进行处理
同步数据
follower写多少条数据到事务日志文件之后,会执行flush
1,返回ack给leader,会尽快的用while循环,把积压在内存队列里排队的proposal的请求全部都尽快的写入到磁盘上的事务日志里去
一旦将队列里积压的proposal都写入事务日志了,此时就可以执行flush了
2,如果你连续写入1000条数到事务日志里去,此时也会强制性执行flush以及后续的操作
follower要写入1000条事务日志之后,才能进行flush以及后续的处理
定期清理磁盘上不需要使用的文件
1~10000条数据,在这个事务日志文件01里
同时写一份数据快照,包含了1~10000条事务
10001~20000条数据,在这个事务日志文件02里
同时写一份数据快照,包含了1~20000条事务
这个快照就可以恢复数据
删除包含了1~10000条事务的数据快照
删除包含了1~10000条事务的日志
删除包含了10001~20000条事务的日志
crud操作
DataTree.java
对于zk来说,核心的数据结构,并不是文件目录树的结构
而是map接口,他所有操作都是针对一个path
所有watcher都是监听dataTree
创建节点
create [-s] [-e] path data acl s表示临时 e表示序号递增
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/demo01/world");
查询节点
ls path [watch] 后面可以跟watch监听某个节点
byte[] bytes = client.getData().forPath("/demo01/world");
获取路径数据的同时,可以对这个路径加一个监听器
childWatcher可以对这个znode的子节点变化进行监听
修改节点
set path data [version]
client.setData().forPath("/demo01/world","您好啊".getBytes());
删除节点
delete path [version]
client.delete().forPath("/hello5");
递归删除
rmr path
exists
判断当前节点是否存在
watch机制
//设置节点cache
TreeCache treeCache = new TreeCache(client,"/");
TreeCache treeCache = new TreeCache(client,"/");
//设置监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data!=null){
//switch 判断各种可能性
switch (event.getType()){
case NODE_ADDED:
System.out.println("节点新增:路径:"+data.getPath()+"数据:"+new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("节点删除,路径:"+data.getPath()+"数据:"+new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("节点修改,路径:"+data.getPath()+"数据:"+new String(data.getData()));
break;
}
}
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data!=null){
//switch 判断各种可能性
switch (event.getType()){
case NODE_ADDED:
System.out.println("节点新增:路径:"+data.getPath()+"数据:"+new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("节点删除,路径:"+data.getPath()+"数据:"+new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("节点修改,路径:"+data.getPath()+"数据:"+new String(data.getData()));
break;
}
}
如果请求发到follower,如何转发给leader
首先由nioServerCnxnFactory收到请求
进入FollowerRequestProcessor
写请求以字节流发送到leader
顺序节点是如何来处理的
后面会拼上一串递增的10位的数字
/kafka/brokers/brokers-0000000001
/kafka/brokers/brokers-0000000002
临时节点,如果你的客户端宕机了,临时节点会自动被删除掉
ephemeral,会全部删除掉
并且一定会触发相对应的watch监听器
proposal的zxid是如何创建
先检查当前session是否过期
每次创建节点,都会更新父节点的cversion值
只有leader才可以生成zxid
全局性的生成唯一的zxid,内部加了synchronized锁,,会不断的++
创建监听器
zookeeper最最核心的一块机制,监听和回调机制
对znode文件目录树的数据结构进行增删改查的操作,临时节点,顺序节点,最多只能实现让一些其他的分布式系统、大数据系统可以基于zk进行集群元数据的存储和管理
也需要其他的很多功能,协调、选举、高可用自动切换
提供监听和回调的机制,只有把这个功能给实现了,此时zookeeper才是工业级的
监听器
三种监听器
gateData时传一个watcher
getChildren时传一个watcher,对子节点加一个监听器
exists可以加一个watcher
监听是否存在
在客户端也要保存一份;在zk服务端也要保存一份
watcher会放入请求中
什么时候在客户端完成注册
ZKWatchManager.java
对每一个路径,都会有一系列的监听器
getData,getChildren,exists请求先发送出去
知道这些请求成功完成了,返回响应,此时调用了finishPacket方法,才会进行监听器的注册
只有保证从服务端成功请求之后,服务端表示已经注册监听器了,然后客户端再进行注册。
调用zookeeper.java的register注册方法
zk服务端处理查询请求中的watch标识
watcher会放入请求中发送给服务端
对于服务端而言,跟每个客户端的连接,都是一个nioServercnxn
NIOServerCnxn实现了watcher接口
gateData()的时候
dataWatches.addWatch(path,watcher);
watcher==客户端的连接
path->对应多个wathcer
wathcer(客户端连接)->对应多个path
如果说一旦客户端的session断开,zk服务端要进行一些清理,1,删除临时节点,2,以及删除客户端注册的一些监听器
增删改事件发生的时候触发监听器
DataTree.java
内存里znode树出现变化,执行commit之后,会触发一些watcher监听器
比如对一个节点加了监听器,增删改的时候
先增删改,然后触发回调
触发当前节点监听器
dataWatches.triggerWatch()
触发子节点监听器
childWatches.triggerWatch()
假如对目录增加了一个childWatcher
触发监听器后,直接把zk服务端注册的监听器给删除掉
所以原生api里,监听器是一次性的,数据变化触发了监听器,自动会删除这个监听器
所以才用curator去连续监听,此时Curator他会在底层自动进行重新注册
服务端触发监听器的时候,回调客户端连接
childWatches.triggerWatch()以后会调用process方法
会拿到一个watcherEvent事件
里面包括,类型,状态,path
sendResponse()序列化以后,直接响应客户端
zk客户端收到watch监听通知的请求之后,如何处理
clientCnxn读取请求
收到watchedEvent事件
客户端挂掉
zk服务端
会删除掉你所有的这个客户端创建的临时节点
2PC的模式去进行删除
删除了之后到commit的时候,删除了内存里的节点,一定会触发对应的监听器的
客户端加在服务端上的这些监听器都给干掉
zk服务端挂掉
客户端
客户端必须去找其他的机器,leader,follower,去建立长连接
建立会话,重新把自己内存里注册的那些监听器,在新建立连接的机器上去进行一个重新的注册
服务端
三种宕机
1,follower挂掉
2,leader挂掉
3,版本升级
客户端如何感知服务器宕机
客户端发送请求或ping
会执行socket.write()方法,这时候会感知到服务端挂掉了,抛异常出去
doIO抛异常
doTransport()抛异常
catch会捕获到异常
cleanup
客户端主动断开网络连接,关闭socket
然后再关闭,socketChannel
对多有pendingqueue(已发送等待响应的请求)标记失败完成
对还没发送出去的请求(sendqueue)标记器失败
清空自己的监听器(watcher)
发布一个断开事件
clientCnxn通信组件的run方法,进行重新连接
startconnect()
连接集群内其他的zkserver。连接下一台
连接成功后,重新进行监听器的注册
一旦默认监听器感知到说建立了新的连接,此时要重新去施加所有的监听器
follower宕机,leader是如何感知
follower是主动跟leader建立网络连接的
proposalRequestProcessor同步数据时(LearnerHandler线程),是可以感知到的
抛异常后捕获,socket关闭
挂掉一个follower,对leader的2PC过半写机制有影响吗
majority(超过集群节点个数的一半以上)
集群本身没有影响,还能用,读请求没有影响
但是新的增删改有影响,因为返回不了过半的ack,不会进行第二阶段的commit
所以要尽快的恢复follower,然后同步数据,返回ack
重启挂掉的follower
会跟leader进行数据同步,syncWithLeader()
LearnerHandler会发很多的数据,同步给follower
follower会收到一些proposal,然后返回ack
Leader宕机,各个follower会如何感知
连接在这台机器的客户端短时间内肯定会不停的重新连接
follower从leader读取数据异常,直接关掉连接
zk短时间内是灾难性的
follower感知到leader崩溃后,释放掉网络资源,执行Leaner的shutdown(),并把状态设置成looking
follower他会把所有客户端的连接全部断开,对于客户端而言的话,他会短时间内频繁报错
此时集群就需要重新进行一个leader选举
哪个follower接收到的zxid更大,或者zxid一样大,myid,两个人一定会投票给某一个人,两轮投票,结果出来
大约需要几百毫秒或者最多几秒钟就会选出来
选举出新的leader之后
follower会跟新的leader建立连接,LearnerHandler(发送请求和接受响应ack)跟他进行通信,进行数据恢复
另外,对外提供服务,所有的客户端会自动进行重连的,找到一台机器进行连接,读写请求继续执行,另外监听器重新施加
宕机的那台leader再次重启,他此时就会发现已经有leader了,此时他就是一个follower,跟leader建立长连接,然后进行数据的恢复,接着的话呢,整个集群就瞬间就可以恢复运作了
zk重启的时候,是如何加载磁盘上的数据进行恢复的
正常情况下,会把磁盘上事务日志快照重新加载到内存中
zkDataBase.java
调用loadDataBase()
他把最近的磁盘快照反序列化到内存,对快照进行回放
然后基于内存的数据与leader进行数据同步
0 条评论
下一页